Skip to content
CloudOps
Newsletter
All prompts
AI for Bash & Python Automation Difficulty: Advanced ClaudeChatGPT

Python Idempotent ETL Pipeline Prompt

Build a restartable extract-transform-load script with checkpointing, idempotent upserts, batching, and dead-letter handling so a mid-run failure resumes instead of double-loading.

Target user
Data and platform engineers writing batch sync jobs between systems
Difficulty
Advanced
Tools
Claude, ChatGPT

The prompt

You are a staff data engineer who has rescued more than one ETL job that double-loaded a million rows after a crash. Design a small, idempotent, restartable ETL pipeline in Python.

I will provide:
- Source (API, database, CSV drop, S3 prefix) and its paging/cursor model
- Destination (SQL table, warehouse, API) and whether it supports upsert
- The natural key for each record and what "the same record changed" means
- Volume, frequency, and the acceptable runtime/SLA

Your job:

1. **Decompose into pure stages** — `extract()` yields raw records, `transform()` maps raw to a typed dataclass (validating and dropping/quarantining bad rows), `load()` upserts a batch. Keep each stage independently testable; no stage should reach across boundaries.

2. **Make load idempotent** — prefer database-native `INSERT ... ON CONFLICT (key) DO UPDATE` or `MERGE`; for APIs, use the natural key plus an idempotency header. Show how to compute a content hash so unchanged rows are skipped (no needless writes, no churned `updated_at`).

3. **Checkpoint the cursor** — persist the last successfully committed source cursor (offset, timestamp, or page token) to a small state file or a `etl_state` table, written only after the batch's load commits. On restart, resume from that cursor. Walk through why "process then checkpoint" (at-least-once) plus idempotent load equals exactly-once effect.

4. **Batch and bound memory** — stream with generators, never `list()` the whole source. Make batch size configurable; commit per batch inside a transaction.

5. **Dead-letter bad records** — route validation failures to a quarantine file/table with the raw payload and error, then continue. Fail the run only when the dead-letter rate crosses a threshold.

6. **Retry transient failures** — wrap network/db calls in exponential backoff with jitter; distinguish retryable (timeout, 5xx, deadlock) from fatal (4xx, schema mismatch).

7. **Observability** — emit structured logs and counters: extracted, transformed, loaded, skipped-unchanged, quarantined, retried. Print a one-line summary at the end suitable for a cron email.

8. **Concurrency (optional)** — if loads are independent, parallelize with a bounded `ThreadPoolExecutor`/`asyncio` while keeping checkpointing serial and correct.

Output: (a) the staged module with dataclasses and type hints, (b) the checkpoint/state implementation, (c) a pytest suite that kills the run mid-batch and proves no duplicates after resume, (d) a runbook section on backfills and replays.

Be opinionated: streaming over buffering, upsert over insert, checkpoint-after-commit, and quarantine over crash.
Newsletter

Free: the DevOps AI Incident-Triage Cheat Sheet

Subscribe and we’ll send you the one-page cheat sheet — plus weekly AI prompts, automation ideas, and tool reviews for infrastructure engineers. One email a week. No spam, unsubscribe anytime.

  • AI Incident-Triage Cheat Sheet (PDF)
  • Access to 1,603 DevOps AI prompts
  • One practical workflow email per week