Skip to content
DevOps AI ToolKit
Newsletter
All prompts
AI for Bash & Python Automation Difficulty: Advanced ClaudeChatGPTCursor

Python asyncio.create_subprocess_exec Fan-Out Prompt

Run many external commands concurrently under asyncio with bounded concurrency, captured output, and per-command timeouts.

Target user
Senior Python engineers orchestrating large shell-out workloads
Difficulty
Advanced
Tools
Claude, ChatGPT, Cursor

The prompt

You are a senior Python engineer building an async fan-out runner that executes a large list of external commands concurrently. Use asyncio.create_subprocess_exec — never shell=True equivalents. Follow these steps exactly:

1. Accept a list of command argument vectors from [COMMAND LIST] where each entry is a list like ["rsync", "-a", src, dst] (no shell string), so arguments are passed directly to exec and never interpreted by a shell.
2. Create an asyncio.Semaphore([MAX CONCURRENCY], default 16) and acquire it inside each task's async with block so at most that many subprocesses run at once, bounding file-descriptor and memory pressure regardless of how long the command list is.
3. For each command, await asyncio.create_subprocess_exec(*argv, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) so stdout and stderr are captured separately rather than inherited.
4. Read output with stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=[PER COMMAND TIMEOUT SECONDS]); communicate() drains both pipes concurrently to avoid the classic deadlock where a full pipe buffer blocks a process you are waiting on.
5. On TimeoutError, call proc.kill() and then await proc.communicate() (or await proc.wait()) to reap the killed child and avoid zombie processes, then record the command as timed out.
6. Build the task set with asyncio.gather(*tasks, return_exceptions=True) or a TaskGroup so one failing or timing-out command never cancels the rest; collect per-command exit code, decoded stdout/stderr (with errors="replace"), and a status of ok | nonzero | timeout | spawn_error.
7. Wrap the orchestration in a single asyncio.run(main()) entry point and guard against unbounded log capture by truncating each stream to [MAX CAPTURE BYTES].

Output format: return a single runnable Python module exposing async def run_all(commands) -> list[Result], where Result is a dataclass with fields (argv, returncode, stdout, stderr, status, duration_s), plus a __main__ block that prints a JSON summary of counts by status.

Idempotency/safety guardrail: the runner must never mutate shared state from inside the gather; each task returns its own immutable Result and the only aggregation happens after all tasks settle, so re-running the same command list produces the same per-command outcomes and a killed/timed-out child is always reaped before its slot is released.

Why this prompt works

Shelling out to hundreds of external commands is one of the highest-leverage uses of asyncio, but it is riddled with traps that only show up under load. The first is unbounded concurrency: naively gathering one subprocess per item will happily try to spawn thousands of processes at once, exhausting file descriptors and memory. The prompt makes a bounded asyncio.Semaphore non-negotiable, so concurrency stays flat no matter how long the input list grows — the same bounded-concurrency discipline that distinguishes a production fan-out from a fork bomb.

The second trap is the pipe-buffer deadlock. A child process that writes more than the OS pipe buffer to stdout will block until something reads it; if your parent is await proc.wait()-ing instead of draining the pipes, both sides hang forever. Mandating proc.communicate() (which drains stdout and stderr concurrently) wrapped in asyncio.wait_for solves both the deadlock and the timeout in one move. Crucially, the prompt also requires reaping the child after a timeout kill — a step engineers routinely forget, leaving a trail of zombie processes that eventually exhaust the process table.

Finally, the design isolates failure and enforces determinism. By using return_exceptions=True (or a TaskGroup) and returning an immutable per-command Result, a single bad command can never cancel its siblings, and aggregation happens only after every task settles. That combination — argv-not-shell execution, bounded concurrency, guaranteed reaping, and side-effect-free tasks — is what lets you re-run the whole batch safely and trust the JSON status summary it produces.

Related prompts

Newsletter

Free: the DevOps AI Incident-Triage Cheat Sheet

Subscribe and we’ll send you the one-page cheat sheet — plus weekly AI prompts, automation ideas, and tool reviews for infrastructure engineers. One email a week. No spam, unsubscribe anytime.

  • AI Incident-Triage Cheat Sheet (PDF)
  • Access to 2,104 DevOps AI prompts
  • One practical workflow email per week