listener: use psycopg notifies(timeout=) instead of wait_for(anext()) to fix spurious StopAsyncIteration
Some checks are pending
build / lint-test (push) Waiting to run
build / build-push (push) Blocked by required conditions

This commit is contained in:
kianiadee 2026-05-23 01:20:43 +03:00
parent 1dcfe9b773
commit 0fb24a8ade

View file

@ -3,10 +3,13 @@
The handler is invoked on every notification AND every sweep_interval seconds, The handler is invoked on every notification AND every sweep_interval seconds,
so a missed NOTIFY (e.g. connection blip) never wedges the worker. Under so a missed NOTIFY (e.g. connection blip) never wedges the worker. Under
normal operation the timer fires on an empty queue and is a no-op. normal operation the timer fires on an empty queue and is a no-op.
Uses psycopg 3's built-in notifies(timeout=...) instead of wrapping anext()
in asyncio.wait_for the wait_for-cancellation path corrupts the async
generator's state and produces spurious StopAsyncIteration errors.
""" """
import asyncio import asyncio
import contextlib
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import Any from typing import Any
@ -49,12 +52,15 @@ async def _run_once(
await cur.execute(f"LISTEN {channel}") await cur.execute(f"LISTEN {channel}")
log.info("listener.subscribed", channel=channel) log.info("listener.subscribed", channel=channel)
# Drain anything already pending on subscription
await _safe_call(handler, channel) await _safe_call(handler, channel)
notifies = conn.notifies() # Stream notifications with a built-in timeout. When the timeout elapses
# without a notification, the inner loop body simply doesn't run; we
# then call the handler (sweep) and re-enter the outer loop.
while True: while True:
with contextlib.suppress(TimeoutError): async for _ in conn.notifies(timeout=sweep_interval, stop_after=1):
await asyncio.wait_for(anext(notifies), timeout=sweep_interval) pass # notification received — handler runs unconditionally below
await _safe_call(handler, channel) await _safe_call(handler, channel)
finally: finally:
await conn.close() await conn.close()