fleet-platform/app/entrypoints/worker.py

61 lines
1.7 KiB
Python
Raw Permalink Normal View History

"""Worker entrypoint.
Runs as a FastAPI app (so /health/worker is reachable) with two long-lived
background tasks spawned in lifespan:
- parser: LISTEN events_raw_new drain events.raw events.parsed
- projector: LISTEN events_parsed_new drain events.parsed state.live_positions
"""
import asyncio
import contextlib
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI
from app.config import get_settings
from app.db import close_pool, get_pool
from app.health import router as health_router
from app.logging_setup import configure_logging
from app.projectors import live_positions
from app.workers import parser
from app.workers.listener import listen_forever
log = structlog.get_logger("worker")
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
configure_logging()
settings = get_settings()
await get_pool()
log.info("worker.starting", git_sha=settings.app_git_sha, mode=settings.app_mode)
parser_task = asyncio.create_task(
listen_forever("events_raw_new", parser.drain),
name="parser-listener",
)
projector_task = asyncio.create_task(
listen_forever("events_parsed_new", live_positions.drain),
name="live-positions-listener",
)
try:
yield
finally:
for t in (parser_task, projector_task):
t.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t
await close_pool()
app = FastAPI(
title="fleet-platform [worker]",
version=get_settings().app_git_sha,
lifespan=lifespan,
)
app.include_router(health_router)