"""Parser worker — drains events.raw to events.parsed.""" import structlog from psycopg.types.json import Jsonb from app.db import get_pool from app.parsers.jimi import PARSER_VERSION, UnsupportedMsgType, parse_raw log = structlog.get_logger("worker.parser") DRAIN_BATCH = 200 async def drain() -> int: pool = await get_pool() processed = 0 async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur: await cur.execute( """ SELECT event_id, received_at, source, msg_type, account_id, payload FROM events.raw WHERE parsed_at IS NULL ORDER BY received_at FOR UPDATE SKIP LOCKED LIMIT %s """, (DRAIN_BATCH,), ) rows = await cur.fetchall() if not rows: return 0 parsed_rows: list[tuple[object, ...]] = [] error_rows: list[tuple[object, ...]] = [] mark_rows: list[tuple[object, ...]] = [] for event_id, received_at, source, msg_type, account_id, payload in rows: try: events = parse_raw(source, msg_type, payload, account_id) except UnsupportedMsgType: mark_rows.append((PARSER_VERSION, event_id, received_at)) continue except Exception as exc: error_rows.append(( event_id, received_at, PARSER_VERSION, exc.__class__.__name__, str(exc)[:1000], Jsonb(payload), )) mark_rows.append((PARSER_VERSION, event_id, received_at)) continue for ev in events: parsed_rows.append(( event_id, received_at, ev.occurred_at, ev.kind, ev.account_id or "", ev.imei, Jsonb(ev.payload), PARSER_VERSION, )) mark_rows.append((PARSER_VERSION, event_id, received_at)) if parsed_rows: await cur.executemany( """ INSERT INTO events.parsed ( raw_event_id, raw_received_at, occurred_at, kind, account_id, imei, payload, parser_version ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, parsed_rows, ) if error_rows: await cur.executemany( """ INSERT INTO events.parser_errors ( raw_event_id, raw_received_at, parser_version, error_class, error_message, payload ) VALUES (%s, %s, %s, %s, %s, %s) """, error_rows, ) await cur.executemany( """ UPDATE events.raw SET parsed_at = now(), parser_version = %s WHERE event_id = %s AND received_at = %s """, mark_rows, ) processed = len(rows) if processed: log.info( "parser.drained", raw=processed, parsed=len(parsed_rows), errors=len(error_rows), ) return processed