Collapse the backend from 7 Coolify services to 4 app services + the DB. - Merge ingest_movement + ingest_events into a single ingest_worker: split each poller's main() into reusable startup_catchup()/register_jobs() and drive both from one schedule loop in new ingest_worker_rev.py (standalone entrypoints retained for local debug). - docker-compose.yaml: replace the two poller services with ingest_worker; remove the pgbouncer service (dormant; transaction-mode pooling is unsafe for the advisory-lock'd v_trips refresher) and the grafana service + grafana-data volume (redundant with the FleetOps SPA). - Add reporting.v_ingest_health (migration 19) + dashboard_api GET /health/ingest as the pipeline-freshness surface that replaces Grafana's health panels. webhook_receiver stays isolated so a poller fault can't drop inbound pushes. timescale_db and db_backup are unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
52 lines
2.1 KiB
Python
52 lines
2.1 KiB
Python
"""
|
|
ingest_worker_rev.py — Fireside Communications · Merged Ingest Worker
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
RESPONSIBILITY: Run the movement and events polling pipelines in a single
|
|
process. Consolidates the former `ingest_movement` and `ingest_events`
|
|
containers into one `ingest_worker` service.
|
|
|
|
WHY ONE PROCESS: both pipelines were identical in shape — blocking
|
|
`while True: schedule.run_pending()` daemons that register jobs onto the
|
|
`schedule` library's module-global default scheduler and share the same
|
|
ts_shared_rev ThreadedConnectionPool. Driving every job from one
|
|
run_pending() loop is strictly equivalent to running them separately, with
|
|
one fewer container, one log stream, and one connection pool.
|
|
|
|
The inbound `webhook_receiver` is deliberately NOT merged here: pushed
|
|
device data is unrecoverable, so it stays isolated from poller faults.
|
|
|
|
Standalone entrypoints (`python ingest_movement_rev.py`,
|
|
`python ingest_events_rev.py`) remain intact for local debugging — this
|
|
module only reuses their startup_catchup()/register_jobs() helpers.
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
import time
|
|
import schedule
|
|
|
|
from ts_shared_rev import get_logger, setup_shutdown
|
|
import ingest_movement_rev as mv
|
|
import ingest_events_rev as ev
|
|
|
|
log = get_logger("ingest_worker")
|
|
|
|
|
|
def main():
|
|
log.info("Starting INGEST WORKER — merged MOVEMENT + EVENTS pipelines")
|
|
setup_shutdown(log) # one SIGTERM/SIGINT handler for the shared DB pool
|
|
|
|
# Startup catch-up — warm both pipelines immediately.
|
|
mv.startup_catchup()
|
|
ev.startup_catchup()
|
|
|
|
# Register every job onto the shared global `schedule` scheduler.
|
|
mv.register_jobs()
|
|
ev.register_jobs()
|
|
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|