98 lines
3.2 KiB
Python
98 lines
3.2 KiB
Python
"""Parser worker — drains events.raw to events.parsed."""
|
|
|
|
import structlog
|
|
from psycopg.types.json import Jsonb
|
|
|
|
from app.db import get_pool
|
|
from app.parsers.jimi import PARSER_VERSION, UnsupportedMsgType, parse_raw
|
|
|
|
log = structlog.get_logger("worker.parser")
|
|
|
|
DRAIN_BATCH = 200
|
|
|
|
|
|
async def drain() -> int:
|
|
pool = await get_pool()
|
|
processed = 0
|
|
async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur:
|
|
await cur.execute(
|
|
"""
|
|
SELECT event_id, received_at, source, msg_type, account_id, payload
|
|
FROM events.raw
|
|
WHERE parsed_at IS NULL
|
|
ORDER BY received_at
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT %s
|
|
""",
|
|
(DRAIN_BATCH,),
|
|
)
|
|
rows = await cur.fetchall()
|
|
if not rows:
|
|
return 0
|
|
|
|
parsed_rows: list[tuple[object, ...]] = []
|
|
error_rows: list[tuple[object, ...]] = []
|
|
mark_rows: list[tuple[object, ...]] = []
|
|
|
|
for event_id, received_at, source, msg_type, account_id, payload in rows:
|
|
try:
|
|
events = parse_raw(source, msg_type, payload, account_id)
|
|
except UnsupportedMsgType:
|
|
mark_rows.append((PARSER_VERSION, event_id, received_at))
|
|
continue
|
|
except Exception as exc:
|
|
error_rows.append((
|
|
event_id, received_at, PARSER_VERSION,
|
|
exc.__class__.__name__, str(exc)[:1000], Jsonb(payload),
|
|
))
|
|
mark_rows.append((PARSER_VERSION, event_id, received_at))
|
|
continue
|
|
|
|
for ev in events:
|
|
parsed_rows.append((
|
|
event_id, received_at, ev.occurred_at, ev.kind,
|
|
ev.account_id or "", ev.imei, Jsonb(ev.payload),
|
|
PARSER_VERSION,
|
|
))
|
|
mark_rows.append((PARSER_VERSION, event_id, received_at))
|
|
|
|
if parsed_rows:
|
|
await cur.executemany(
|
|
"""
|
|
INSERT INTO events.parsed (
|
|
raw_event_id, raw_received_at, occurred_at, kind,
|
|
account_id, imei, payload, parser_version
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
parsed_rows,
|
|
)
|
|
|
|
if error_rows:
|
|
await cur.executemany(
|
|
"""
|
|
INSERT INTO events.parser_errors (
|
|
raw_event_id, raw_received_at, parser_version,
|
|
error_class, error_message, payload
|
|
) VALUES (%s, %s, %s, %s, %s, %s)
|
|
""",
|
|
error_rows,
|
|
)
|
|
|
|
await cur.executemany(
|
|
"""
|
|
UPDATE events.raw
|
|
SET parsed_at = now(), parser_version = %s
|
|
WHERE event_id = %s AND received_at = %s
|
|
""",
|
|
mark_rows,
|
|
)
|
|
processed = len(rows)
|
|
|
|
if processed:
|
|
log.info(
|
|
"parser.drained",
|
|
raw=processed,
|
|
parsed=len(parsed_rows),
|
|
errors=len(error_rows),
|
|
)
|
|
return processed
|