Improve health schemas and readiness checks

This commit is contained in:
2025-11-27 14:21:25 +01:00
parent 7acce1da02
commit 3a7208d28d
4 changed files with 246 additions and 100 deletions

View File

@@ -1,12 +1,14 @@
"""Health endpoints."""
"""Health endpoints for FastAPI application health checks (Liveness, Readiness, Detailed Status)."""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import PlainTextResponse
# Dependencies for application configuration and schema definitions
from core.config import Settings, get_settings
from operations.health.schemas import HealthStatus
from operations.health.schemas import HealthStatus, HealthStatusEnum
from operations.health.service import get_detailed_health, readiness_check
# Initialize the API router for health endpoints, grouping them under the "/health" prefix
router = APIRouter(prefix="/health", tags=["health"])
@@ -25,8 +27,9 @@ def liveness() -> str:
performing no deep checks on external dependencies.
**Success Response:** HTTP 200 OK with "live" body.
**Failure Response:** Endpoint timeout (no response).
**Failure Response:** The orchestrator will interpret a *TCP connection timeout* as a failure.
"""
# Simply returning a string confirms the Python process and FastAPI are functional.
return "live"
@@ -40,14 +43,18 @@ async def readiness() -> str:
"""
**Readiness Probe:** Determines if the application can accept user traffic.
This endpoint is used by load balancers to route traffic. It performs deep checks
on all critical dependencies (e.g., database, message queue).
This endpoint is used by load balancers or service meshes to decide whether
to route traffic to this specific instance. It performs deep checks
on all critical dependencies (e.g., database connection, external services).
**Success Response:** HTTP 200 OK with "ready" body.
**Failure Response:** HTTP 503 Service Unavailable if any critical dependency fails.
"""
# Call the service layer function that runs all critical checks concurrently
ok = await readiness_check()
if not ok:
# If any check fails, signal 'Service Unavailable' so traffic is diverted
raise HTTPException(status_code=503, detail="not ready")
return "ready"
@@ -57,18 +64,21 @@ async def readiness() -> str:
"",
summary="Detailed Health Status Page",
response_model=HealthStatus,
status_code=200,
)
async def detailed_health(settings: Settings = Depends(get_settings)) -> HealthStatus:
"""
**Detailed Status Page:** Provides granular health information for human operators.
**Detailed Status Page:** Provides granular health information for human operators/monitoring tools.
This endpoint runs all readiness checks and returns a structured JSON object.
The top-level HTTP status code reflects the overall application health (200 OK or 503 Service Unavailable).
This endpoint runs all readiness checks and returns a structured JSON object
containing the status of each individual component.
The top-level HTTP status code reflects the overall application health (200 OK for 'pass', 503 for 'fail').
"""
# Retrieve the comprehensive health status model
detailed_health = await get_detailed_health(settings)
if detailed_health.status != "pass":
if detailed_health.status != HealthStatusEnum.passed:
# Align the HTTP status code with the overall health status for easy monitoring
raise HTTPException(status_code=503, detail="not ready")
# Status code is 200
return detailed_health

View File

@@ -1,23 +1,65 @@
"""Pydantic schemas for health responses."""
"""
Pydantic schemas for defining health check responses, following IETF standards.
"""
from datetime import datetime
from enum import Enum
from pydantic import BaseModel
# Import ConfigDict from pydantic to resolve the deprecation warning
from pydantic import BaseModel, Field, ConfigDict
# Define acceptable statuses as an Enum for robust validation
class HealthStatusEnum(str, Enum):
"""Enumeration for standard health check statuses."""
passed = "pass"
warned = "warn"
failed = "fail"
class ComponentCheck(BaseModel):
name: str
# pass | warn | fail
status: str
time: datetime | None = None
output: str | None = None
observedValue: float | int | None = None
observedUnit: str | None = None
"""
Represents the status and metrics for a single internal component or dependency.
"""
# Use ConfigDict instead of the class Config approach
model_config = ConfigDict(
populate_by_name=True # Allows instantiation using either 'observed_value' or 'observedValue'
)
name: str = Field(description="The unique name of the component being checked (e.g., 'postgres', 'redis').")
status: HealthStatusEnum = Field(description="The status of the check: 'pass', 'warn', or 'fail'.")
time: datetime | None = Field(default=None, description="The time at which the check was performed in ISO 8601 format.")
output: str | None = Field(default=None, description="Additional details, error messages, or logs if the status is 'fail' or 'warn'.")
# Python uses snake_case internally, JSON uses camelCase for the alias
observed_value: float | int | None = Field(
default=None,
alias="observedValue",
description="The value observed during the check (e.g., latency in ms)."
)
# Python uses snake_case internally, JSON uses camelCase for the alias
observed_unit: str | None = Field(
default=None,
alias="observedUnit",
description="The unit of the observed value (e.g., 'ms', 'count', 'bytes')."
)
class HealthStatus(BaseModel):
# pass | warn | fail
status: str
version: str | None = None
environment: str | None = None
serviceName: str | None = None
description: str | None = None
checks: dict[str, ComponentCheck]
"""
The overall system health response model, aggregating all individual component checks.
"""
# Use ConfigDict instead of the class Config approach
model_config = ConfigDict(
populate_by_name=True
)
status: HealthStatusEnum = Field(description="The aggregate status of the entire service: 'pass', 'warn', or 'fail'.")
version: str | None = Field(default=None, description="The application version (e.g., Git SHA or semantic version number).")
environment: str | None = Field(default=None, description="The deployment environment (e.g., 'production', 'staging').")
# Python uses snake_case internally, JSON uses camelCase for the alias
service_name: str | None = Field(
default=None,
alias="serviceName",
description="The name of the service."
)
description: str | None = Field(default=None, description="A brief description of the service.")
checks: dict[str, ComponentCheck] = Field(description="A dictionary mapping check keys (e.g., 'Database') to their detailed ComponentCheck results.")

View File

@@ -1,89 +1,129 @@
import asyncio
from datetime import datetime, timezone
from typing import Callable, Coroutine, Any, List, Dict
from core.config import Settings
from operations.health.schemas import ComponentCheck, HealthStatus
# Assuming schemas now contains the Enum definition and uses it in the models
from operations.health.schemas import ComponentCheck, HealthStatus, HealthStatusEnum
# Type alias for a function that returns an awaitable ComponentCheck.
HealthCheckFunc = Callable[[], Coroutine[Any, Any, ComponentCheck]]
async def _run_check_with_timeout(
check_coroutine: Coroutine[Any, Any, None],
name: str,
timeout_ms: int
) -> ComponentCheck:
"""
A utility wrapper that executes a given async coroutine with a strict timeout constraint.
It standardizes the exception handling and timing calculation for health checks.
"""
start_time = datetime.now(timezone.utc)
# Convert milliseconds timeout to a float in seconds for asyncio
timeout_seconds = timeout_ms / 1000.0
try:
# Enforce the timeout using the modern asyncio.timeout context manager (Python 3.11+)
async with asyncio.timeout(timeout_seconds):
await check_coroutine
# If execution reaches here, the check passed within the time limit.
duration = datetime.now(timezone.utc) - start_time
observed_value = int(duration.total_seconds() * 1000) # value stored in ms
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.passed,
time=datetime.now(timezone.utc),
observedValue=observed_value,
observedUnit="ms",
)
except asyncio.TimeoutError:
# The operation specifically took too long and the timeout context manager raised an exception.
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.failed,
time=datetime.now(timezone.utc),
output=f"Check timed out after {timeout_seconds:.2f}s",
)
except Exception as e:
# Catch any other general exceptions (e.g., connection refused, network down)
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.failed,
time=datetime.now(timezone.utc),
output=f"An error occurred: {str(e)}",
)
async def check_database_status() -> ComponentCheck:
"""Checking the primary database connection with a timeout."""
try:
# Simulate an async DB call (replace with actual logic)
await asyncio.wait_for(asyncio.sleep(0.042), timeout=0.1)
"""
Initiates the check for the primary database connection.
Calls the generic wrapper with specific logic and timeout for Postgres.
"""
async def db_logic():
# IMPORTANT: Replace this sleep simulation with the actual async DB client call (e.g., await database.ping())
await asyncio.sleep(0.042)
return ComponentCheck(name = "postgres",
status = "pass",
time = datetime.now(timezone.utc),
observedValue = 42,
observedUnit = "ms")
except asyncio.TimeoutError:
return ComponentCheck(name = "postgres",
status = "fail",
time = datetime.now(timezone.utc),
output = "Check timed out")
except Exception as e:
return ComponentCheck(name = "postgres",
status = "fail",
time = datetime.now(timezone.utc),
output = str(e))
return await _run_check_with_timeout(db_logic(), name="postgres", timeout_ms=100)
async def check_media_server_status() -> ComponentCheck:
"""Checking the media server connection with a timeout."""
try:
# Simulate a successful async queue call
await asyncio.wait_for(asyncio.sleep(0.02), timeout=0.05)
"""
Initiates the check for the media server connection.
Calls the generic wrapper with specific logic and timeout for LiveKit/Media Server.
"""
async def media_logic():
# IMPORTANT: Replace this sleep simulation with the actual network I/O call (e.g., await http_client.get('...'))
await asyncio.sleep(0.02)
return ComponentCheck(name = "livekit",
status = "pass",
time = datetime.now(timezone.utc),
observedValue = 20,
observedUnit = "ms")
except asyncio.TimeoutError:
return ComponentCheck(name = "livekit",
status = "fail",
time = datetime.now(timezone.utc),
output = "Check timed out")
except Exception as e:
return ComponentCheck(name = "livekit",
status = "fail",
time = datetime.now(timezone.utc),
output = str(e))
return await _run_check_with_timeout(media_logic(), name="livekit", timeout_ms=50)
# This dictionary serves as the single source of truth for all critical health checks.
CRITICAL_CHECKS: Dict[str, HealthCheckFunc] = {
"Database": check_database_status,
"Media Server": check_media_server_status
}
async def readiness_check() -> bool:
"""
*Extend readiness_check() as dependencies are added; it must return True/False.*
Performs a readiness probe. The service is considered "ready" only if *all* critical checks pass.
"""
tasks = [check_func() for check_func in CRITICAL_CHECKS.values()]
results: List[ComponentCheck] = await asyncio.gather(*tasks)
# Run all critical checks
db = await check_database_status()
media_server = await check_media_server_status()
# Check if all statuses are 'pass'
if db.status == 'pass' and media_server.status == 'pass':
return True
else:
return False
# Check if every result status is equal to the HealthStatusEnum.passed value ('pass')
return all(result.status == HealthStatusEnum.passed for result in results)
async def get_detailed_health(settings: Settings) -> HealthStatus:
"""
Build detailed health payload aligned with common status formats.
status: pass | warn | fail
Builds a detailed health payload that conforms to the health+json specification.
Aggregates results from all CRITICAL_CHECKS and includes system metadata.
"""
# Run all critical checks
db = await check_database_status()
media_server = await check_media_server_status()
tasks = [check_func() for check_func in CRITICAL_CHECKS.values()]
results: List[ComponentCheck] = await asyncio.gather(*tasks)
# Check if all statuses are 'pass'
overall = "pass"
if db.status != 'pass' or media_server.status != 'pass':
overall = "fail"
# Initialize overall status using the Enum value
overall = HealthStatusEnum.passed
checks = {}
return HealthStatus(status = overall,
version = settings.version,
environment = settings.environment,
serviceName = settings.service_name,
description = settings.title,
checks = {"Database": db,
"Media Server": media_server})
# Iterate through results, mapping them back to their original dictionary keys
for key, result in zip(CRITICAL_CHECKS.keys(), results):
checks[key] = result
# Compare status against the Enum value
if result.status != HealthStatusEnum.passed:
# If any individual check fails, the overall system status must be 'fail'
overall = HealthStatusEnum.failed
# Assemble the final, comprehensive health report object using provided settings
return HealthStatus(
status=overall,
version=settings.version,
environment=settings.environment,
serviceName=settings.service_name,
description=settings.title,
checks=checks,
)

View File

@@ -2,31 +2,85 @@ from fastapi.testclient import TestClient
from hypothesis import given, settings
from hypothesis import strategies as st
# Import the main FastAPI application instance from your source code
from main import app
# Initialize the TestClient to make requests against your FastAPI app instance
client = TestClient(app)
def test_liveness_ok():
"""
Test the basic liveness endpoint.
A liveness probe checks if the container is running and responsive.
It should always return a 200 OK status and the text 'live'.
"""
response = client.get("/health/live")
assert response.status_code == 200
assert response.text == "live"
@given(st.text(min_size=0, max_size=16))
@settings(max_examples=10)
def test_liveness_resilience_to_query_noise(noise: str):
"""
Use Hypothesis for property-based testing.
This test ensures that the liveness endpoint is robust and remains functional
even when unexpected or garbage query parameters ("noise") are provided in the URL.
The `given` decorator generates various string inputs for the 'noise' parameter.
"""
# Pass arbitrary query parameters to the endpoint
response = client.get("/health/live", params={"noise": noise})
assert response.status_code == 200
assert response.text == "live"
def test_readiness_ok():
"""
Test the basic readiness endpoint.
A readiness probe checks if the container is ready to accept traffic (e.g., database connection established).
It should return a 200 OK status and the text 'ready' when healthy.
"""
response = client.get("/health/ready")
assert response.status_code == 200
assert response.text == "ready"
@given(st.text(min_size=0, max_size=16))
@settings(max_examples=10)
def test_readiness_resilience_to_query_noise(noise: str):
"""
Use Hypothesis for property-based testing.
This test ensures that the readiness endpoint is robust and remains functional
even when unexpected or garbage query parameters ("noise") are provided in the URL.
The `given` decorator generates various string inputs for the 'noise' parameter.
"""
# Pass arbitrary query parameters to the endpoint
response = client.get("/health/ready", params={"noise": noise})
assert response.status_code == 200
assert response.text == "ready"
def test_detailed_health_pass():
resp = client.get("/health")
assert resp.status_code == 200
body = resp.json()
"""
Test the detailed health check endpoint, often conforming to the
[IETF health check standard](datatracker.ietf.org).
It should return a 200 OK status, and the JSON body should have a
"status" of "pass" and a dictionary of individual "checks".
"""
response = client.get("/health")
assert response.status_code == 200
body = response.json()
assert body["status"] == "pass"
assert isinstance(body["checks"], dict)
@given(st.text(min_size=0, max_size=16))
@settings(max_examples=10)
def test_health_resilient_to_query_noise(noise: str):
resp = client.get("/health/live", params={"noise": noise})
assert resp.status_code == 200
assert resp.text == "live"
def test_health_resilience_to_query_noise(noise: str):
"""
Use Hypothesis for property-based testing.
This test ensures that the health endpoint is robust and remains functional
even when unexpected or garbage query parameters ("noise") are provided in the URL.
The `given` decorator generates various string inputs for the 'noise' parameter.
"""
# Pass arbitrary query parameters to the endpoint
response = client.get("/health", params={"noise": noise})
assert response.status_code == 200
body = response.json()
assert body["status"] == "pass"
assert isinstance(body["checks"], dict)