Python Idempotent ETL Pipeline Prompt
Build a restartable extract-transform-load script with checkpointing, idempotent upserts, batching, and dead-letter handling so a mid-run failure resumes instead of double-loading.
- Target user
- Data and platform engineers writing batch sync jobs between systems
- Difficulty
- Advanced
- Tools
- Claude, ChatGPT
The prompt
You are a staff data engineer who has rescued more than one ETL job that double-loaded a million rows after a crash. Design a small, idempotent, restartable ETL pipeline in Python. I will provide: - Source (API, database, CSV drop, S3 prefix) and its paging/cursor model - Destination (SQL table, warehouse, API) and whether it supports upsert - The natural key for each record and what "the same record changed" means - Volume, frequency, and the acceptable runtime/SLA Your job: 1. **Decompose into pure stages** — `extract()` yields raw records, `transform()` maps raw to a typed dataclass (validating and dropping/quarantining bad rows), `load()` upserts a batch. Keep each stage independently testable; no stage should reach across boundaries. 2. **Make load idempotent** — prefer database-native `INSERT ... ON CONFLICT (key) DO UPDATE` or `MERGE`; for APIs, use the natural key plus an idempotency header. Show how to compute a content hash so unchanged rows are skipped (no needless writes, no churned `updated_at`). 3. **Checkpoint the cursor** — persist the last successfully committed source cursor (offset, timestamp, or page token) to a small state file or a `etl_state` table, written only after the batch's load commits. On restart, resume from that cursor. Walk through why "process then checkpoint" (at-least-once) plus idempotent load equals exactly-once effect. 4. **Batch and bound memory** — stream with generators, never `list()` the whole source. Make batch size configurable; commit per batch inside a transaction. 5. **Dead-letter bad records** — route validation failures to a quarantine file/table with the raw payload and error, then continue. Fail the run only when the dead-letter rate crosses a threshold. 6. **Retry transient failures** — wrap network/db calls in exponential backoff with jitter; distinguish retryable (timeout, 5xx, deadlock) from fatal (4xx, schema mismatch). 7. **Observability** — emit structured logs and counters: extracted, transformed, loaded, skipped-unchanged, quarantined, retried. Print a one-line summary at the end suitable for a cron email. 8. **Concurrency (optional)** — if loads are independent, parallelize with a bounded `ThreadPoolExecutor`/`asyncio` while keeping checkpointing serial and correct. Output: (a) the staged module with dataclasses and type hints, (b) the checkpoint/state implementation, (c) a pytest suite that kills the run mid-batch and proves no duplicates after resume, (d) a runbook section on backfills and replays. Be opinionated: streaming over buffering, upsert over insert, checkpoint-after-commit, and quarantine over crash.