fleettickets/n8n-s3-export-workflows.md
david kiania df054c92be feat: INC hourly-CSV ingestion (newest-file, ETag dedup, clean + archive)
Rework import_tickets.py from the retired JSON `latest.json` model to the new
hourly full-snapshot CSV export. Strictly INC (CRQ out of scope).

- Ingest the newest automations/inc/<EAT-timestamp>.csv; skip-if-unchanged by
  comparing S3 ETag to tickets.import_meta.metadata.source_etag.
- Upsert on ticket_id (PK; no dups, never delete -> closure history accrues).
  No truncate. On success, move processed files to automations/inc/processed/.
- Clean at ingest: drop is_alarm=true + the "EXPORT STOPPED..." sentinel; drop
  week_*, source_s3_*/source_snapshot_id, department/source_type; lowercase
  region, uppercase raw_status; keep service_type + bucket.
- Force path-style S3 addressing; --inc-csv for local dev; --from-bucket for cron.
- Add migrations/02 (import_meta + freshness); refresh README/.env.example/docs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 19:33:16 +03:00

256 lines
7.7 KiB
Markdown

# n8n S3 Export Workflows
## Overview
Both workflows run in the `Africa/Nairobi` timezone and use the
`Rahamafresh Tickets S3` credential.
| Workflow | n8n ID | Source | Bucket |
| --- | --- | --- | --- |
| Fuel Records S3 Export | `IP2KNAfFazAjTesh` | `logistics_department.fuel_records` | `fuel` |
| FTTH Automation Ticket S3 Export | `JI3QkcJeHk9eYRsY` | `isp_department_crq.automations` and `isp_department_osp.automations` | `tickets` |
## Schedules
- Hourly delta export: `10` minutes after each hour, from `01:10` through
`23:10` (`0 10 1-23 * * *`).
- Daily full export: `00:05` (`0 5 0 * * *`).
- The `00:05` run exports rows up to the end of the previous local day. For
example, Wednesday's run exports the snapshot through Tuesday `23:59:59`.
- The full export has its own state and does not read or advance the hourly
delta pointer.
## Fuel Records S3 Export
### Source and change tracking
The source is `logistics_department.fuel_records`. The table has an indexed
`updated_at` column and an update trigger that refreshes it whenever a row is
changed.
Hourly runs select rows where:
```sql
updated_at > last_successful_delta_export_at
AND updated_at <= requested_at
```
The delta pointer advances to the maximum exported `updated_at` only after all
S3 uploads complete and the downloaded `latest.json` passes validation. A
failed or empty run does not move the pointer incorrectly.
The full run selects all rows created before local midnight, independently of
the delta pointer.
### Fuel object keys
Every successful run updates:
- `fuel_records/latest.json`
- `fuel_records/latest.csv`
Delta runs with changed rows also write:
- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.json`
- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.csv`
Full runs write:
- `fuel_records/full/YYYY-MM-DD.json`
- `fuel_records/full/YYYY-MM-DD.csv`
Exports larger than 5,000 rows additionally produce numbered JSON and CSV
parts such as `-part-0001`.
### Fuel state
Fuel state is stored in n8n workflow static data and is updated only after S3
read-back validation:
- `last_successful_delta_export_at`
- `last_successful_full_export_date`
- `rows_exported`
- `destination_key` and `destination_keys`
- `n8n_execution_id`
- `success`
- `error_message`
- `completed_at`
## FTTH Automation Ticket S3 Export
### Source and change tracking
The workflow requests an export package from the active SCOREBOARD service:
```text
POST /api/v1/ftth/automation-export/package
```
Datasets:
- CRQ: `isp_department_crq.automations`
- INC: `isp_department_osp.automations`
Hourly delta packages select records changed after the last successful delta
export. The SCOREBOARD service creates an export run before returning the
package. n8n uploads every returned object and then calls:
```text
POST /api/v1/ftth/automation-export/mark-sent
```
Only a `SUCCESS` acknowledgement advances the delta pointer. Upload failure
marks the run `FAILED` and preserves the previous pointer.
Full packages use the previous local date as `snapshot_date`, select the
complete current-state dataset through the previous day, and update only the
full-export date after successful upload.
### FTTH object keys
CRQ:
- `automations/crq/latest.json`
- `automations/crq/latest.csv`
- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.json`
- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv`
- `automations/crq/full/YYYY-MM-DD.json`
- `automations/crq/full/YYYY-MM-DD.csv`
INC:
- `automations/inc/latest.json`
- `automations/inc/latest.csv`
- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.json`
- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv`
- `automations/inc/full/YYYY-MM-DD.json`
- `automations/inc/full/YYYY-MM-DD.csv`
Exports larger than 5,000 rows additionally produce numbered JSON and CSV
parts.
### FTTH state
State and audit history are stored in
`ftth_automation.automation_export_runs`. Each run records:
- export type and requested timestamp
- last successful delta timestamp
- last successful full export date
- snapshot date
- row count
- destination object keys
- n8n execution ID
- status (`PENDING`, `SUCCESS`, or `FAILED`)
- completion timestamp and error summary
## File Contents
JSON files contain:
```json
{
"metadata": {
"exported_at": "...",
"export_type": "delta or full",
"source_schema": "...",
"source_table": "...",
"dataset": "crq, inc, or omitted for fuel",
"row_count": 0,
"last_successful_delta_export_at": "...",
"last_successful_full_export_date": "...",
"snapshot_date": "...",
"n8n_execution_id": "..."
},
"records": []
}
```
Fuel metadata names the previous and candidate delta pointers explicitly.
CSV files contain the same exported records as tabular rows with a header
line. CSV files do not contain the JSON metadata envelope.
## Manual Tests
Both workflows expose production POST webhooks.
Fuel:
```bash
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"delta"}' \
https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"full"}' \
https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export
```
FTTH:
```bash
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"delta"}' \
https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"full","force":true}' \
https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export
```
After a test:
1. Confirm the webhook response has `success: true`.
2. Open the execution ID in n8n and confirm every upload succeeded.
3. Confirm the response lists the expected bucket and destination keys.
4. Check the S3 object timestamps and inspect the JSON metadata and row count.
5. For FTTH, confirm the matching export run is `SUCCESS`.
### Production test record
Tests run on June 15, 2026:
| Workflow | Type | Execution | Result |
| --- | --- | --- | --- |
| Fuel Records S3 Export | Delta | `402524` | Success; 0 changed rows; latest JSON and CSV validated |
| Fuel Records S3 Export | Full | `402527` | Success; 1,965 rows; snapshot date `2026-06-14` |
| FTTH Automation Ticket S3 Export | Delta | `402530` | Success; CRQ and INC latest/change JSON and CSV written |
| FTTH Automation Ticket S3 Export | Full | `402536` | Success; 44,114 rows; snapshot date `2026-06-14`; 28 objects including batch parts |
## Troubleshooting
1. Check the n8n execution and identify whether the source query/package,
upload, read-back validation, or mark-sent step failed.
2. Confirm the `Rahamafresh Tickets S3` credential can write to the configured
bucket.
3. For fuel, inspect workflow static data. Do not manually advance
`last_successful_delta_export_at` after a failed run.
4. Verify `fuel_records.updated_at` is populated and its update trigger exists
if fuel changes are missing.
5. For FTTH, inspect `ftth_automation.automation_export_runs`, including
`status`, `destination_object_keys`, `n8n_execution_id`, and
`error_summary`.
6. Confirm the SCOREBOARD health endpoint is healthy and that the configured
export token and base URL are correct.
7. Re-run the appropriate manual webhook after fixing the failure. A failed
run leaves the last successful pointer unchanged, so the rows are retried.
## Published Version Check
In n8n, open each workflow and confirm:
- `Active` is enabled.
- The saved `versionId` equals `activeVersionId`.
- The trigger list contains the hourly schedule, daily `00:05` schedule, and
manual webhook.
- A new production webhook execution uses the same active version and returns
the expected destination keys.
Current published versions as of June 15, 2026:
- Fuel Records S3 Export: `6833e5e5-97a0-41be-8f82-9ec612de92ce`
- FTTH Automation Ticket S3 Export: `b2171088-eac2-439b-97e8-83dfa8117783`