fleet-platform/app/workers/parser.py

98 lines
3.2 KiB
Python

"""Parser worker — drains events.raw to events.parsed."""
import structlog
from psycopg.types.json import Jsonb
from app.db import get_pool
from app.parsers.jimi import PARSER_VERSION, UnsupportedMsgType, parse_raw
log = structlog.get_logger("worker.parser")
DRAIN_BATCH = 200
async def drain() -> int:
pool = await get_pool()
processed = 0
async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur:
await cur.execute(
"""
SELECT event_id, received_at, source, msg_type, account_id, payload
FROM events.raw
WHERE parsed_at IS NULL
ORDER BY received_at
FOR UPDATE SKIP LOCKED
LIMIT %s
""",
(DRAIN_BATCH,),
)
rows = await cur.fetchall()
if not rows:
return 0
parsed_rows: list[tuple[object, ...]] = []
error_rows: list[tuple[object, ...]] = []
mark_rows: list[tuple[object, ...]] = []
for event_id, received_at, source, msg_type, account_id, payload in rows:
try:
events = parse_raw(source, msg_type, payload, account_id)
except UnsupportedMsgType:
mark_rows.append((PARSER_VERSION, event_id, received_at))
continue
except Exception as exc:
error_rows.append((
event_id, received_at, PARSER_VERSION,
exc.__class__.__name__, str(exc)[:1000], Jsonb(payload),
))
mark_rows.append((PARSER_VERSION, event_id, received_at))
continue
for ev in events:
parsed_rows.append((
event_id, received_at, ev.occurred_at, ev.kind,
ev.account_id or "", ev.imei, Jsonb(ev.payload),
PARSER_VERSION,
))
mark_rows.append((PARSER_VERSION, event_id, received_at))
if parsed_rows:
await cur.executemany(
"""
INSERT INTO events.parsed (
raw_event_id, raw_received_at, occurred_at, kind,
account_id, imei, payload, parser_version
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""",
parsed_rows,
)
if error_rows:
await cur.executemany(
"""
INSERT INTO events.parser_errors (
raw_event_id, raw_received_at, parser_version,
error_class, error_message, payload
) VALUES (%s, %s, %s, %s, %s, %s)
""",
error_rows,
)
await cur.executemany(
"""
UPDATE events.raw
SET parsed_at = now(), parser_version = %s
WHERE event_id = %s AND received_at = %s
""",
mark_rows,
)
processed = len(rows)
if processed:
log.info(
"parser.drained",
raw=processed,
parsed=len(parsed_rows),
errors=len(error_rows),
)
return processed