diff --git a/app/backend/operations/health/router.py b/app/backend/operations/health/router.py index ad2431f..8fb8842 100644 --- a/app/backend/operations/health/router.py +++ b/app/backend/operations/health/router.py @@ -1,12 +1,14 @@ -"""Health endpoints.""" +"""Health endpoints for FastAPI application health checks (Liveness, Readiness, Detailed Status).""" from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import PlainTextResponse +# Dependencies for application configuration and schema definitions from core.config import Settings, get_settings -from operations.health.schemas import HealthStatus +from operations.health.schemas import HealthStatus, HealthStatusEnum from operations.health.service import get_detailed_health, readiness_check +# Initialize the API router for health endpoints, grouping them under the "/health" prefix router = APIRouter(prefix="/health", tags=["health"]) @@ -25,8 +27,9 @@ def liveness() -> str: performing no deep checks on external dependencies. **Success Response:** HTTP 200 OK with "live" body. - **Failure Response:** Endpoint timeout (no response). + **Failure Response:** The orchestrator will interpret a *TCP connection timeout* as a failure. """ + # Simply returning a string confirms the Python process and FastAPI are functional. return "live" @@ -40,14 +43,18 @@ async def readiness() -> str: """ **Readiness Probe:** Determines if the application can accept user traffic. - This endpoint is used by load balancers to route traffic. It performs deep checks - on all critical dependencies (e.g., database, message queue). + This endpoint is used by load balancers or service meshes to decide whether + to route traffic to this specific instance. It performs deep checks + on all critical dependencies (e.g., database connection, external services). **Success Response:** HTTP 200 OK with "ready" body. **Failure Response:** HTTP 503 Service Unavailable if any critical dependency fails. """ + # Call the service layer function that runs all critical checks concurrently ok = await readiness_check() + if not ok: + # If any check fails, signal 'Service Unavailable' so traffic is diverted raise HTTPException(status_code=503, detail="not ready") return "ready" @@ -57,18 +64,21 @@ async def readiness() -> str: "", summary="Detailed Health Status Page", response_model=HealthStatus, - status_code=200, ) async def detailed_health(settings: Settings = Depends(get_settings)) -> HealthStatus: """ - **Detailed Status Page:** Provides granular health information for human operators. + **Detailed Status Page:** Provides granular health information for human operators/monitoring tools. - This endpoint runs all readiness checks and returns a structured JSON object. - The top-level HTTP status code reflects the overall application health (200 OK or 503 Service Unavailable). + This endpoint runs all readiness checks and returns a structured JSON object + containing the status of each individual component. + The top-level HTTP status code reflects the overall application health (200 OK for 'pass', 503 for 'fail'). """ + # Retrieve the comprehensive health status model detailed_health = await get_detailed_health(settings) - if detailed_health.status != "pass": + if detailed_health.status != HealthStatusEnum.passed: + # Align the HTTP status code with the overall health status for easy monitoring raise HTTPException(status_code=503, detail="not ready") + # Status code is 200 return detailed_health diff --git a/app/backend/operations/health/schemas.py b/app/backend/operations/health/schemas.py index dd39d3d..40e6afd 100644 --- a/app/backend/operations/health/schemas.py +++ b/app/backend/operations/health/schemas.py @@ -1,23 +1,65 @@ -"""Pydantic schemas for health responses.""" +""" +Pydantic schemas for defining health check responses, following IETF standards. +""" from datetime import datetime +from enum import Enum -from pydantic import BaseModel +# Import ConfigDict from pydantic to resolve the deprecation warning +from pydantic import BaseModel, Field, ConfigDict + +# Define acceptable statuses as an Enum for robust validation +class HealthStatusEnum(str, Enum): + """Enumeration for standard health check statuses.""" + passed = "pass" + warned = "warn" + failed = "fail" class ComponentCheck(BaseModel): - name: str - # pass | warn | fail - status: str - time: datetime | None = None - output: str | None = None - observedValue: float | int | None = None - observedUnit: str | None = None + """ + Represents the status and metrics for a single internal component or dependency. + """ + # Use ConfigDict instead of the class Config approach + model_config = ConfigDict( + populate_by_name=True # Allows instantiation using either 'observed_value' or 'observedValue' + ) + + name: str = Field(description="The unique name of the component being checked (e.g., 'postgres', 'redis').") + status: HealthStatusEnum = Field(description="The status of the check: 'pass', 'warn', or 'fail'.") + time: datetime | None = Field(default=None, description="The time at which the check was performed in ISO 8601 format.") + output: str | None = Field(default=None, description="Additional details, error messages, or logs if the status is 'fail' or 'warn'.") + + # Python uses snake_case internally, JSON uses camelCase for the alias + observed_value: float | int | None = Field( + default=None, + alias="observedValue", + description="The value observed during the check (e.g., latency in ms)." + ) + # Python uses snake_case internally, JSON uses camelCase for the alias + observed_unit: str | None = Field( + default=None, + alias="observedUnit", + description="The unit of the observed value (e.g., 'ms', 'count', 'bytes')." + ) class HealthStatus(BaseModel): - # pass | warn | fail - status: str - version: str | None = None - environment: str | None = None - serviceName: str | None = None - description: str | None = None - checks: dict[str, ComponentCheck] + """ + The overall system health response model, aggregating all individual component checks. + """ + # Use ConfigDict instead of the class Config approach + model_config = ConfigDict( + populate_by_name=True + ) + + status: HealthStatusEnum = Field(description="The aggregate status of the entire service: 'pass', 'warn', or 'fail'.") + version: str | None = Field(default=None, description="The application version (e.g., Git SHA or semantic version number).") + environment: str | None = Field(default=None, description="The deployment environment (e.g., 'production', 'staging').") + + # Python uses snake_case internally, JSON uses camelCase for the alias + service_name: str | None = Field( + default=None, + alias="serviceName", + description="The name of the service." + ) + description: str | None = Field(default=None, description="A brief description of the service.") + checks: dict[str, ComponentCheck] = Field(description="A dictionary mapping check keys (e.g., 'Database') to their detailed ComponentCheck results.") diff --git a/app/backend/operations/health/service.py b/app/backend/operations/health/service.py index 886bd43..f7e56e3 100644 --- a/app/backend/operations/health/service.py +++ b/app/backend/operations/health/service.py @@ -1,89 +1,129 @@ import asyncio from datetime import datetime, timezone +from typing import Callable, Coroutine, Any, List, Dict from core.config import Settings -from operations.health.schemas import ComponentCheck, HealthStatus +# Assuming schemas now contains the Enum definition and uses it in the models +from operations.health.schemas import ComponentCheck, HealthStatus, HealthStatusEnum +# Type alias for a function that returns an awaitable ComponentCheck. +HealthCheckFunc = Callable[[], Coroutine[Any, Any, ComponentCheck]] + +async def _run_check_with_timeout( + check_coroutine: Coroutine[Any, Any, None], + name: str, + timeout_ms: int +) -> ComponentCheck: + """ + A utility wrapper that executes a given async coroutine with a strict timeout constraint. + It standardizes the exception handling and timing calculation for health checks. + """ + start_time = datetime.now(timezone.utc) + # Convert milliseconds timeout to a float in seconds for asyncio + timeout_seconds = timeout_ms / 1000.0 + + try: + # Enforce the timeout using the modern asyncio.timeout context manager (Python 3.11+) + async with asyncio.timeout(timeout_seconds): + await check_coroutine + + # If execution reaches here, the check passed within the time limit. + duration = datetime.now(timezone.utc) - start_time + observed_value = int(duration.total_seconds() * 1000) # value stored in ms + + return ComponentCheck( + name=name, + # Use the Enum value for status + status=HealthStatusEnum.passed, + time=datetime.now(timezone.utc), + observedValue=observed_value, + observedUnit="ms", + ) + + except asyncio.TimeoutError: + # The operation specifically took too long and the timeout context manager raised an exception. + return ComponentCheck( + name=name, + # Use the Enum value for status + status=HealthStatusEnum.failed, + time=datetime.now(timezone.utc), + output=f"Check timed out after {timeout_seconds:.2f}s", + ) + except Exception as e: + # Catch any other general exceptions (e.g., connection refused, network down) + return ComponentCheck( + name=name, + # Use the Enum value for status + status=HealthStatusEnum.failed, + time=datetime.now(timezone.utc), + output=f"An error occurred: {str(e)}", + ) async def check_database_status() -> ComponentCheck: - """Checking the primary database connection with a timeout.""" - try: - # Simulate an async DB call (replace with actual logic) - await asyncio.wait_for(asyncio.sleep(0.042), timeout=0.1) - - return ComponentCheck(name = "postgres", - status = "pass", - time = datetime.now(timezone.utc), - observedValue = 42, - observedUnit = "ms") - except asyncio.TimeoutError: - return ComponentCheck(name = "postgres", - status = "fail", - time = datetime.now(timezone.utc), - output = "Check timed out") - except Exception as e: - return ComponentCheck(name = "postgres", - status = "fail", - time = datetime.now(timezone.utc), - output = str(e)) + """ + Initiates the check for the primary database connection. + Calls the generic wrapper with specific logic and timeout for Postgres. + """ + async def db_logic(): + # IMPORTANT: Replace this sleep simulation with the actual async DB client call (e.g., await database.ping()) + await asyncio.sleep(0.042) + + return await _run_check_with_timeout(db_logic(), name="postgres", timeout_ms=100) async def check_media_server_status() -> ComponentCheck: - """Checking the media server connection with a timeout.""" - try: - # Simulate a successful async queue call - await asyncio.wait_for(asyncio.sleep(0.02), timeout=0.05) + """ + Initiates the check for the media server connection. + Calls the generic wrapper with specific logic and timeout for LiveKit/Media Server. + """ + async def media_logic(): + # IMPORTANT: Replace this sleep simulation with the actual network I/O call (e.g., await http_client.get('...')) + await asyncio.sleep(0.02) - return ComponentCheck(name = "livekit", - status = "pass", - time = datetime.now(timezone.utc), - observedValue = 20, - observedUnit = "ms") - except asyncio.TimeoutError: - return ComponentCheck(name = "livekit", - status = "fail", - time = datetime.now(timezone.utc), - output = "Check timed out") - except Exception as e: - return ComponentCheck(name = "livekit", - status = "fail", - time = datetime.now(timezone.utc), - output = str(e)) + return await _run_check_with_timeout(media_logic(), name="livekit", timeout_ms=50) + +# This dictionary serves as the single source of truth for all critical health checks. +CRITICAL_CHECKS: Dict[str, HealthCheckFunc] = { + "Database": check_database_status, + "Media Server": check_media_server_status +} async def readiness_check() -> bool: """ - *Extend readiness_check() as dependencies are added; it must return True/False.* + Performs a readiness probe. The service is considered "ready" only if *all* critical checks pass. """ + tasks = [check_func() for check_func in CRITICAL_CHECKS.values()] + results: List[ComponentCheck] = await asyncio.gather(*tasks) - # Run all critical checks - db = await check_database_status() - media_server = await check_media_server_status() - - # Check if all statuses are 'pass' - if db.status == 'pass' and media_server.status == 'pass': - return True - else: - return False + # Check if every result status is equal to the HealthStatusEnum.passed value ('pass') + return all(result.status == HealthStatusEnum.passed for result in results) async def get_detailed_health(settings: Settings) -> HealthStatus: """ - Build detailed health payload aligned with common status formats. - - status: pass | warn | fail + Builds a detailed health payload that conforms to the health+json specification. + Aggregates results from all CRITICAL_CHECKS and includes system metadata. """ - # Run all critical checks - db = await check_database_status() - media_server = await check_media_server_status() + tasks = [check_func() for check_func in CRITICAL_CHECKS.values()] + results: List[ComponentCheck] = await asyncio.gather(*tasks) - # Check if all statuses are 'pass' - overall = "pass" - if db.status != 'pass' or media_server.status != 'pass': - overall = "fail" + # Initialize overall status using the Enum value + overall = HealthStatusEnum.passed + checks = {} + + # Iterate through results, mapping them back to their original dictionary keys + for key, result in zip(CRITICAL_CHECKS.keys(), results): + checks[key] = result + # Compare status against the Enum value + if result.status != HealthStatusEnum.passed: + # If any individual check fails, the overall system status must be 'fail' + overall = HealthStatusEnum.failed - return HealthStatus(status = overall, - version = settings.version, - environment = settings.environment, - serviceName = settings.service_name, - description = settings.title, - checks = {"Database": db, - "Media Server": media_server}) + # Assemble the final, comprehensive health report object using provided settings + return HealthStatus( + status=overall, + version=settings.version, + environment=settings.environment, + serviceName=settings.service_name, + description=settings.title, + checks=checks, + ) diff --git a/app/backend/tests/test_health.py b/app/backend/tests/test_health.py index 7adc081..c73712b 100644 --- a/app/backend/tests/test_health.py +++ b/app/backend/tests/test_health.py @@ -2,31 +2,85 @@ from fastapi.testclient import TestClient from hypothesis import given, settings from hypothesis import strategies as st +# Import the main FastAPI application instance from your source code from main import app +# Initialize the TestClient to make requests against your FastAPI app instance client = TestClient(app) def test_liveness_ok(): + """ + Test the basic liveness endpoint. + A liveness probe checks if the container is running and responsive. + It should always return a 200 OK status and the text 'live'. + """ response = client.get("/health/live") assert response.status_code == 200 assert response.text == "live" +@given(st.text(min_size=0, max_size=16)) +@settings(max_examples=10) +def test_liveness_resilience_to_query_noise(noise: str): + """ + Use Hypothesis for property-based testing. + This test ensures that the liveness endpoint is robust and remains functional + even when unexpected or garbage query parameters ("noise") are provided in the URL. + The `given` decorator generates various string inputs for the 'noise' parameter. + """ + # Pass arbitrary query parameters to the endpoint + response = client.get("/health/live", params={"noise": noise}) + assert response.status_code == 200 + assert response.text == "live" def test_readiness_ok(): + """ + Test the basic readiness endpoint. + A readiness probe checks if the container is ready to accept traffic (e.g., database connection established). + It should return a 200 OK status and the text 'ready' when healthy. + """ response = client.get("/health/ready") assert response.status_code == 200 assert response.text == "ready" +@given(st.text(min_size=0, max_size=16)) +@settings(max_examples=10) +def test_readiness_resilience_to_query_noise(noise: str): + """ + Use Hypothesis for property-based testing. + This test ensures that the readiness endpoint is robust and remains functional + even when unexpected or garbage query parameters ("noise") are provided in the URL. + The `given` decorator generates various string inputs for the 'noise' parameter. + """ + # Pass arbitrary query parameters to the endpoint + response = client.get("/health/ready", params={"noise": noise}) + assert response.status_code == 200 + assert response.text == "ready" + def test_detailed_health_pass(): - resp = client.get("/health") - assert resp.status_code == 200 - body = resp.json() + """ + Test the detailed health check endpoint, often conforming to the + [IETF health check standard](datatracker.ietf.org). + It should return a 200 OK status, and the JSON body should have a + "status" of "pass" and a dictionary of individual "checks". + """ + response = client.get("/health") + assert response.status_code == 200 + body = response.json() assert body["status"] == "pass" assert isinstance(body["checks"], dict) @given(st.text(min_size=0, max_size=16)) @settings(max_examples=10) -def test_health_resilient_to_query_noise(noise: str): - resp = client.get("/health/live", params={"noise": noise}) - assert resp.status_code == 200 - assert resp.text == "live" +def test_health_resilience_to_query_noise(noise: str): + """ + Use Hypothesis for property-based testing. + This test ensures that the health endpoint is robust and remains functional + even when unexpected or garbage query parameters ("noise") are provided in the URL. + The `given` decorator generates various string inputs for the 'noise' parameter. + """ + # Pass arbitrary query parameters to the endpoint + response = client.get("/health", params={"noise": noise}) + assert response.status_code == 200 + body = response.json() + assert body["status"] == "pass" + assert isinstance(body["checks"], dict)