diff --git a/app/workers/listener.py b/app/workers/listener.py index eeeb627..abce4a1 100644 --- a/app/workers/listener.py +++ b/app/workers/listener.py @@ -3,10 +3,13 @@ The handler is invoked on every notification AND every sweep_interval seconds, so a missed NOTIFY (e.g. connection blip) never wedges the worker. Under normal operation the timer fires on an empty queue and is a no-op. + +Uses psycopg 3's built-in notifies(timeout=...) instead of wrapping anext() +in asyncio.wait_for — the wait_for-cancellation path corrupts the async +generator's state and produces spurious StopAsyncIteration errors. """ import asyncio -import contextlib from collections.abc import Awaitable, Callable from typing import Any @@ -49,12 +52,15 @@ async def _run_once( await cur.execute(f"LISTEN {channel}") log.info("listener.subscribed", channel=channel) + # Drain anything already pending on subscription await _safe_call(handler, channel) - notifies = conn.notifies() + # Stream notifications with a built-in timeout. When the timeout elapses + # without a notification, the inner loop body simply doesn't run; we + # then call the handler (sweep) and re-enter the outer loop. while True: - with contextlib.suppress(TimeoutError): - await asyncio.wait_for(anext(notifies), timeout=sweep_interval) + async for _ in conn.notifies(timeout=sweep_interval, stop_after=1): + pass # notification received — handler runs unconditionally below await _safe_call(handler, channel) finally: await conn.close()