From 0fb24a8adecd1567f63eb99390bfb977ba059556 Mon Sep 17 00:00:00 2001 From: kianiadee Date: Sat, 23 May 2026 01:20:43 +0300 Subject: [PATCH] listener: use psycopg notifies(timeout=) instead of wait_for(anext()) to fix spurious StopAsyncIteration --- app/workers/listener.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/app/workers/listener.py b/app/workers/listener.py index eeeb627..abce4a1 100644 --- a/app/workers/listener.py +++ b/app/workers/listener.py @@ -3,10 +3,13 @@ The handler is invoked on every notification AND every sweep_interval seconds, so a missed NOTIFY (e.g. connection blip) never wedges the worker. Under normal operation the timer fires on an empty queue and is a no-op. + +Uses psycopg 3's built-in notifies(timeout=...) instead of wrapping anext() +in asyncio.wait_for — the wait_for-cancellation path corrupts the async +generator's state and produces spurious StopAsyncIteration errors. """ import asyncio -import contextlib from collections.abc import Awaitable, Callable from typing import Any @@ -49,12 +52,15 @@ async def _run_once( await cur.execute(f"LISTEN {channel}") log.info("listener.subscribed", channel=channel) + # Drain anything already pending on subscription await _safe_call(handler, channel) - notifies = conn.notifies() + # Stream notifications with a built-in timeout. When the timeout elapses + # without a notification, the inner loop body simply doesn't run; we + # then call the handler (sweep) and re-enter the outer loop. while True: - with contextlib.suppress(TimeoutError): - await asyncio.wait_for(anext(notifies), timeout=sweep_interval) + async for _ in conn.notifies(timeout=sweep_interval, stop_after=1): + pass # notification received — handler runs unconditionally below await _safe_call(handler, channel) finally: await conn.close()