import asyncio from datetime import datetime, timezone from typing import Callable, Coroutine, Any, List, Dict from core.config import Settings # Assuming schemas now contains the Enum definition and uses it in the models from operations.health.schemas import ComponentCheck, HealthStatus, HealthStatusEnum # Type alias for a function that returns an awaitable ComponentCheck. HealthCheckFunc = Callable[[], Coroutine[Any, Any, ComponentCheck]] async def _run_check_with_timeout( check_coroutine: Coroutine[Any, Any, None], name: str, timeout_ms: int ) -> ComponentCheck: """ A utility wrapper that executes a given async coroutine with a strict timeout constraint. It standardizes the exception handling and timing calculation for health checks. """ start_time = datetime.now(timezone.utc) # Convert milliseconds timeout to a float in seconds for asyncio timeout_seconds = timeout_ms / 1000.0 try: # Enforce the timeout using the modern asyncio.timeout context manager (Python 3.11+) async with asyncio.timeout(timeout_seconds): await check_coroutine # If execution reaches here, the check passed within the time limit. duration = datetime.now(timezone.utc) - start_time observed_value = int(duration.total_seconds() * 1000) # value stored in ms return ComponentCheck( name=name, # Use the Enum value for status status=HealthStatusEnum.passed, time=datetime.now(timezone.utc), observedValue=observed_value, observedUnit="ms", ) except asyncio.TimeoutError: # The operation specifically took too long and the timeout context manager raised an exception. return ComponentCheck( name=name, # Use the Enum value for status status=HealthStatusEnum.failed, time=datetime.now(timezone.utc), output=f"Check timed out after {timeout_seconds:.2f}s", ) except Exception as e: # Catch any other general exceptions (e.g., connection refused, network down) return ComponentCheck( name=name, # Use the Enum value for status status=HealthStatusEnum.failed, time=datetime.now(timezone.utc), output=f"An error occurred: {str(e)}", ) async def check_database_status() -> ComponentCheck: """ Initiates the check for the primary database connection. Calls the generic wrapper with specific logic and timeout for Postgres. """ async def db_logic(): # IMPORTANT: Replace this sleep simulation with the actual async DB client call (e.g., await database.ping()) await asyncio.sleep(0.042) return await _run_check_with_timeout(db_logic(), name="postgres", timeout_ms=100) async def check_media_server_status() -> ComponentCheck: """ Initiates the check for the media server connection. Calls the generic wrapper with specific logic and timeout for LiveKit/Media Server. """ async def media_logic(): # IMPORTANT: Replace this sleep simulation with the actual network I/O call (e.g., await http_client.get('...')) await asyncio.sleep(0.02) return await _run_check_with_timeout(media_logic(), name="livekit", timeout_ms=50) # This dictionary serves as the single source of truth for all critical health checks. CRITICAL_CHECKS: Dict[str, HealthCheckFunc] = { "Database": check_database_status, "Media Server": check_media_server_status } async def readiness_check() -> bool: """ Performs a readiness probe. The service is considered "ready" only if *all* critical checks pass. """ tasks = [check_func() for check_func in CRITICAL_CHECKS.values()] results: List[ComponentCheck] = await asyncio.gather(*tasks) # Check if every result status is equal to the HealthStatusEnum.passed value ('pass') return all(result.status == HealthStatusEnum.passed for result in results) async def get_detailed_health(settings: Settings) -> HealthStatus: """ Builds a detailed health payload that conforms to the health+json specification. Aggregates results from all CRITICAL_CHECKS and includes system metadata. """ tasks = [check_func() for check_func in CRITICAL_CHECKS.values()] results: List[ComponentCheck] = await asyncio.gather(*tasks) # Initialize overall status using the Enum value overall = HealthStatusEnum.passed checks = {} # Iterate through results, mapping them back to their original dictionary keys for key, result in zip(CRITICAL_CHECKS.keys(), results): checks[key] = result # Compare status against the Enum value if result.status != HealthStatusEnum.passed: # If any individual check fails, the overall system status must be 'fail' overall = HealthStatusEnum.failed # Assemble the final, comprehensive health report object using provided settings return HealthStatus( status=overall, version=settings.version, environment=settings.environment, serviceName=settings.service_name, description=settings.title, checks=checks, )