Python asyncio.create_subprocess_exec Fan-Out Prompt
Run many external commands concurrently under asyncio with bounded concurrency, captured output, and per-command timeouts.
- Target user
- Senior Python engineers orchestrating large shell-out workloads
- Difficulty
- Advanced
- Tools
- Claude, ChatGPT, Cursor
The prompt
You are a senior Python engineer building an async fan-out runner that executes a large list of external commands concurrently. Use asyncio.create_subprocess_exec — never shell=True equivalents. Follow these steps exactly: 1. Accept a list of command argument vectors from [COMMAND LIST] where each entry is a list like ["rsync", "-a", src, dst] (no shell string), so arguments are passed directly to exec and never interpreted by a shell. 2. Create an asyncio.Semaphore([MAX CONCURRENCY], default 16) and acquire it inside each task's async with block so at most that many subprocesses run at once, bounding file-descriptor and memory pressure regardless of how long the command list is. 3. For each command, await asyncio.create_subprocess_exec(*argv, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) so stdout and stderr are captured separately rather than inherited. 4. Read output with stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=[PER COMMAND TIMEOUT SECONDS]); communicate() drains both pipes concurrently to avoid the classic deadlock where a full pipe buffer blocks a process you are waiting on. 5. On TimeoutError, call proc.kill() and then await proc.communicate() (or await proc.wait()) to reap the killed child and avoid zombie processes, then record the command as timed out. 6. Build the task set with asyncio.gather(*tasks, return_exceptions=True) or a TaskGroup so one failing or timing-out command never cancels the rest; collect per-command exit code, decoded stdout/stderr (with errors="replace"), and a status of ok | nonzero | timeout | spawn_error. 7. Wrap the orchestration in a single asyncio.run(main()) entry point and guard against unbounded log capture by truncating each stream to [MAX CAPTURE BYTES]. Output format: return a single runnable Python module exposing async def run_all(commands) -> list[Result], where Result is a dataclass with fields (argv, returncode, stdout, stderr, status, duration_s), plus a __main__ block that prints a JSON summary of counts by status. Idempotency/safety guardrail: the runner must never mutate shared state from inside the gather; each task returns its own immutable Result and the only aggregation happens after all tasks settle, so re-running the same command list produces the same per-command outcomes and a killed/timed-out child is always reaped before its slot is released.
Why this prompt works
Shelling out to hundreds of external commands is one of the highest-leverage uses of asyncio, but it is riddled with traps that only show up under load. The first is unbounded concurrency: naively gathering one subprocess per item will happily try to spawn thousands of processes at once, exhausting file descriptors and memory. The prompt makes a bounded asyncio.Semaphore non-negotiable, so concurrency stays flat no matter how long the input list grows — the same bounded-concurrency discipline that distinguishes a production fan-out from a fork bomb.
The second trap is the pipe-buffer deadlock. A child process that writes more than the OS pipe buffer to stdout will block until something reads it; if your parent is await proc.wait()-ing instead of draining the pipes, both sides hang forever. Mandating proc.communicate() (which drains stdout and stderr concurrently) wrapped in asyncio.wait_for solves both the deadlock and the timeout in one move. Crucially, the prompt also requires reaping the child after a timeout kill — a step engineers routinely forget, leaving a trail of zombie processes that eventually exhaust the process table.
Finally, the design isolates failure and enforces determinism. By using return_exceptions=True (or a TaskGroup) and returning an immutable per-command Result, a single bad command can never cancel its siblings, and aggregation happens only after every task settles. That combination — argv-not-shell execution, bounded concurrency, guaranteed reaping, and side-effect-free tasks — is what lets you re-run the whole batch safely and trust the JSON status summary it produces.
Related prompts
-
Async Concurrent HTTP Poller with asyncio and httpx Prompt
Build a fast, bounded-concurrency async poller/fetcher that hits many endpoints with asyncio and httpx, with rate limiting, retries, timeouts, and structured results.
-
Python asyncio Semaphore Bounded-Concurrency Review Prompt
Review an asyncio script that fans out work to find unbounded concurrency, then redesign it with a semaphore-bounded task pool, proper cancellation, backpressure, and clean shutdown so it can't overwhelm downstreams.
-
Python Safe Subprocess Wrapper Prompt
Build a hardened Python wrapper around subprocess that runs external commands safely — no shell=True, list args, timeouts, captured output, non-zero handling, and streaming logs — replacing fragile os.system and shell-string calls.