Files
playground/app/backend/operations/health/service.py

130 lines
5.1 KiB
Python

import asyncio
from datetime import datetime, timezone
from typing import Callable, Coroutine, Any, List, Dict
from core.config import Settings
# Assuming schemas now contains the Enum definition and uses it in the models
from operations.health.schemas import ComponentCheck, HealthStatus, HealthStatusEnum
# Type alias for a function that returns an awaitable ComponentCheck.
HealthCheckFunc = Callable[[], Coroutine[Any, Any, ComponentCheck]]
async def _run_check_with_timeout(
check_coroutine: Coroutine[Any, Any, None],
name: str,
timeout_ms: int
) -> ComponentCheck:
"""
A utility wrapper that executes a given async coroutine with a strict timeout constraint.
It standardizes the exception handling and timing calculation for health checks.
"""
start_time = datetime.now(timezone.utc)
# Convert milliseconds timeout to a float in seconds for asyncio
timeout_seconds = timeout_ms / 1000.0
try:
# Enforce the timeout using the modern asyncio.timeout context manager (Python 3.11+)
async with asyncio.timeout(timeout_seconds):
await check_coroutine
# If execution reaches here, the check passed within the time limit.
duration = datetime.now(timezone.utc) - start_time
observed_value = int(duration.total_seconds() * 1000) # value stored in ms
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.passed,
time=datetime.now(timezone.utc),
observedValue=observed_value,
observedUnit="ms",
)
except asyncio.TimeoutError:
# The operation specifically took too long and the timeout context manager raised an exception.
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.failed,
time=datetime.now(timezone.utc),
output=f"Check timed out after {timeout_seconds:.2f}s",
)
except Exception as e:
# Catch any other general exceptions (e.g., connection refused, network down)
return ComponentCheck(
name=name,
# Use the Enum value for status
status=HealthStatusEnum.failed,
time=datetime.now(timezone.utc),
output=f"An error occurred: {str(e)}",
)
async def check_database_status() -> ComponentCheck:
"""
Initiates the check for the primary database connection.
Calls the generic wrapper with specific logic and timeout for Postgres.
"""
async def db_logic():
# IMPORTANT: Replace this sleep simulation with the actual async DB client call (e.g., await database.ping())
await asyncio.sleep(0.045)
return await _run_check_with_timeout(db_logic(), name="postgres", timeout_ms=50)
async def check_media_server_status() -> ComponentCheck:
"""
Initiates the check for the media server connection.
Calls the generic wrapper with specific logic and timeout for LiveKit/Media Server.
"""
async def media_logic():
# IMPORTANT: Replace this sleep simulation with the actual network I/O call (e.g., await http_client.get('...'))
await asyncio.sleep(0.02)
return await _run_check_with_timeout(media_logic(), name="livekit", timeout_ms=50)
# This dictionary serves as the single source of truth for all critical health checks.
CRITICAL_CHECKS: Dict[str, HealthCheckFunc] = {
"Database": check_database_status,
"Media Server": check_media_server_status
}
async def readiness_check() -> bool:
"""
Performs a readiness probe. The service is considered "ready" only if *all* critical checks pass.
"""
tasks = [check_func() for check_func in CRITICAL_CHECKS.values()]
results: List[ComponentCheck] = await asyncio.gather(*tasks)
# Check if every result status is equal to the HealthStatusEnum.passed value ('pass')
return all(result.status == HealthStatusEnum.passed for result in results)
async def get_detailed_health(settings: Settings) -> HealthStatus:
"""
Builds a detailed health payload that conforms to the health+json specification.
Aggregates results from all CRITICAL_CHECKS and includes system metadata.
"""
tasks = [check_func() for check_func in CRITICAL_CHECKS.values()]
results: List[ComponentCheck] = await asyncio.gather(*tasks)
# Initialize overall status using the Enum value
overall = HealthStatusEnum.passed
checks = {}
# Iterate through results, mapping them back to their original dictionary keys
for key, result in zip(CRITICAL_CHECKS.keys(), results):
checks[key] = result
# Compare status against the Enum value
if result.status != HealthStatusEnum.passed:
# If any individual check fails, the overall system status must be 'fail'
overall = HealthStatusEnum.failed
# Assemble the final, comprehensive health report object using provided settings
return HealthStatus(
status=overall,
version=settings.version,
environment=settings.environment,
serviceName=settings.service_name,
description=settings.title,
checks=checks,
)