Add app scaffold and workflows
All checks were successful
Continuous Integration / Validate and test changes (push) Successful in 3s

This commit is contained in:
2025-12-03 08:58:34 +01:00
parent 5a8b773e40
commit d6b61ae8fb
51 changed files with 10252 additions and 3 deletions

View File

@@ -0,0 +1 @@
"""Health feature package."""

View File

@@ -0,0 +1,84 @@
"""Health endpoints for FastAPI application health checks (Liveness, Readiness, Detailed Status)."""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import PlainTextResponse
# Dependencies for application configuration and schema definitions
from core.config import Settings, get_settings
from operations.health.schemas import HealthStatus, HealthStatusEnum
from operations.health.service import get_detailed_health, readiness_check
# Initialize the API router for health endpoints, grouping them under the "/health" prefix
router = APIRouter(prefix="/health", tags=["health"])
@router.get(
"/live",
summary="Liveness Probe",
response_class=PlainTextResponse,
status_code=200,
)
def liveness() -> str:
"""
**Liveness Probe:** Confirms the application process is running and responsive.
This endpoint is used by automated systems (like Kubernetes) to determine if
the instance should be kept running or restarted. It must be extremely lightweight,
performing no deep checks on external dependencies.
**Success Response:** HTTP 200 OK with "live" body.
**Failure Response:** The orchestrator will interpret a *TCP connection timeout* as a failure.
"""
# Simply returning a string confirms the Python process and FastAPI are functional.
return "live"
@router.get(
"/ready",
summary="Readiness Probe",
response_class=PlainTextResponse,
status_code=200,
)
async def readiness() -> str:
"""
**Readiness Probe:** Determines if the application can accept user traffic.
This endpoint is used by load balancers or service meshes to decide whether
to route traffic to this specific instance. It performs deep checks
on all critical dependencies (e.g., database connection, external services).
**Success Response:** HTTP 200 OK with "ready" body.
**Failure Response:** HTTP 503 Service Unavailable if any critical dependency fails.
"""
# Call the service layer function that runs all critical checks concurrently
ok = await readiness_check()
if not ok:
# If any check fails, signal 'Service Unavailable' so traffic is diverted
raise HTTPException(status_code=503, detail="not ready")
return "ready"
@router.get(
"",
summary="Detailed Health Status Page",
response_model=HealthStatus,
)
async def detailed_health(settings: Settings = Depends(get_settings)) -> HealthStatus:
"""
**Detailed Status Page:** Provides granular health information for human operators/monitoring tools.
This endpoint runs all readiness checks and returns a structured JSON object
containing the status of each individual component.
The top-level HTTP status code reflects the overall application health (200 OK for 'pass', 503 for 'fail').
"""
# Retrieve the comprehensive health status model
detailed_health = await get_detailed_health(settings)
if detailed_health.status != HealthStatusEnum.passed:
# Align the HTTP status code with the overall health status for easy monitoring
raise HTTPException(status_code=503, detail="not ready")
# Status code is 200
return detailed_health

View File

@@ -0,0 +1,65 @@
"""
Pydantic schemas for defining health check responses, following IETF standards.
"""
from datetime import datetime
from enum import Enum
# Import ConfigDict from pydantic to resolve the deprecation warning
from pydantic import BaseModel, Field, ConfigDict
# Define acceptable statuses as an Enum for robust validation
class HealthStatusEnum(str, Enum):
"""Enumeration for standard health check statuses."""
passed = "pass"
warned = "warn"
failed = "fail"
class ComponentCheck(BaseModel):
"""
Represents the status and metrics for a single internal component or dependency.
"""
# Use ConfigDict instead of the class Config approach
model_config = ConfigDict(
populate_by_name=True # Allows instantiation using either 'observed_value' or 'observedValue'
)
name: str = Field(description="The unique name of the component being checked (e.g., 'postgres', 'redis').")
status: HealthStatusEnum = Field(description="The status of the check: 'pass', 'warn', or 'fail'.")
time: datetime | None = Field(default=None, description="The time at which the check was performed in ISO 8601 format.")
output: str | None = Field(default=None, description="Additional details, error messages, or logs if the status is 'fail' or 'warn'.")
# Python uses snake_case internally, JSON uses camelCase for the alias
observed_value: float | int | None = Field(
default=None,
alias="observedValue",
description="The value observed during the check (e.g., latency in ms)."
)
# Python uses snake_case internally, JSON uses camelCase for the alias
observed_unit: str | None = Field(
default=None,
alias="observedUnit",
description="The unit of the observed value (e.g., 'ms', 'count', 'bytes')."
)
class HealthStatus(BaseModel):
"""
The overall system health response model, aggregating all individual component checks.
"""
# Use ConfigDict instead of the class Config approach
model_config = ConfigDict(
populate_by_name=True
)
status: HealthStatusEnum = Field(description="The aggregate status of the entire service: 'pass', 'warn', or 'fail'.")
version: str | None = Field(default=None, description="The application version (e.g., Git SHA or semantic version number).")
environment: str | None = Field(default=None, description="The deployment environment (e.g., 'production', 'staging').")
# Python uses snake_case internally, JSON uses camelCase for the alias
service_name: str | None = Field(
default=None,
alias="serviceName",
description="The name of the service."
)
description: str | None = Field(default=None, description="A brief description of the service.")
checks: dict[str, ComponentCheck] = Field(description="A dictionary mapping check keys (e.g., 'Database') to their detailed ComponentCheck results.")

View File

@@ -0,0 +1,129 @@
import asyncio
from datetime import datetime, timezone
from typing import Callable, Coroutine, Any, List, Dict
from core.config import Settings
# Assuming schemas now contains the Enum definition and uses it in the models
from operations.health.schemas import ComponentCheck, HealthStatus, HealthStatusEnum
# Type alias for a function that returns an awaitable ComponentCheck.
HealthCheckFunc = Callable[[], Coroutine[Any, Any, ComponentCheck]]
async def _run_check_with_timeout(
check_coroutine: Coroutine[Any, Any, None],
name: str,
timeout_ms: int
) -> ComponentCheck:
"""
A utility wrapper that executes a given async coroutine with a strict timeout constraint.
It standardizes the exception handling and timing calculation for health checks.
"""
start_time = datetime.now(timezone.utc)
# Convert milliseconds timeout to a float in seconds for asyncio
timeout_seconds = timeout_ms / 1000.0
try:
# Enforce the timeout using the modern asyncio.timeout context manager (Python 3.11+)
async with asyncio.timeout(timeout_seconds):
await check_coroutine
# If execution reaches here, the check passed within the time limit.
duration = datetime.now(timezone.utc) - start_time
observed_value = int(duration.total_seconds() * 1000) # value stored in ms
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.passed,
time=datetime.now(timezone.utc),
observedValue=observed_value,
observedUnit="ms",
)
except asyncio.TimeoutError:
# The operation specifically took too long and the timeout context manager raised an exception.
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.failed,
time=datetime.now(timezone.utc),
output=f"Check timed out after {timeout_seconds:.2f}s",
)
except Exception as e:
# Catch any other general exceptions (e.g., connection refused, network down)
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.failed,
time=datetime.now(timezone.utc),
output=f"An error occurred: {str(e)}",
)
async def check_database_status() -> ComponentCheck:
"""
Initiates the check for the primary database connection.
Calls the generic wrapper with specific logic and timeout for Postgres.
"""
async def db_logic():
# IMPORTANT: Replace this sleep simulation with the actual async DB client call (e.g., await database.ping())
await asyncio.sleep(0.045)
return await _run_check_with_timeout(db_logic(), name="postgres", timeout_ms=50)
async def check_media_server_status() -> ComponentCheck:
"""
Initiates the check for the media server connection.
Calls the generic wrapper with specific logic and timeout for LiveKit/Media Server.
"""
async def media_logic():
# IMPORTANT: Replace this sleep simulation with the actual network I/O call (e.g., await http_client.get('...'))
await asyncio.sleep(0.02)
return await _run_check_with_timeout(media_logic(), name="livekit", timeout_ms=50)
# This dictionary serves as the single source of truth for all critical health checks.
CRITICAL_CHECKS: Dict[str, HealthCheckFunc] = {
"Database": check_database_status,
"Media Server": check_media_server_status
}
async def readiness_check() -> bool:
"""
Performs a readiness probe. The service is considered "ready" only if *all* critical checks pass.
"""
tasks = [check_func() for check_func in CRITICAL_CHECKS.values()]
results: List[ComponentCheck] = await asyncio.gather(*tasks)
# Check if every result status is equal to the HealthStatusEnum.passed value ('pass')
return all(result.status == HealthStatusEnum.passed for result in results)
async def get_detailed_health(settings: Settings) -> HealthStatus:
"""
Builds a detailed health payload that conforms to the health+json specification.
Aggregates results from all CRITICAL_CHECKS and includes system metadata.
"""
tasks = [check_func() for check_func in CRITICAL_CHECKS.values()]
results: List[ComponentCheck] = await asyncio.gather(*tasks)
# Initialize overall status using the Enum value
overall = HealthStatusEnum.passed
checks = {}
# Iterate through results, mapping them back to their original dictionary keys
for key, result in zip(CRITICAL_CHECKS.keys(), results):
checks[key] = result
# Compare status against the Enum value
if result.status != HealthStatusEnum.passed:
# If any individual check fails, the overall system status must be 'fail'
overall = HealthStatusEnum.failed
# Assemble the final, comprehensive health report object using provided settings
return HealthStatus(
status=overall,
version=settings.version,
environment=settings.environment,
serviceName=settings.service_name,
description=settings.title,
checks=checks,
)