Running Hundreds of Commands Concurrently with Python asyncio Subprocesses
Fan out hundreds of shell commands with asyncio.create_subprocess_exec, bounding concurrency with a Semaphore, capturing output safely, and enforcing per-task timeouts.
- #automation
- #ai
- #python
- #asyncio
- #concurrency
The task arrives in some variation every few months: run the same command against five hundred hosts, or collect a status from every container, or kick off a probe per region and gather the results. The naive loop is a for loop with subprocess.run, and it works fine until five hundred sequential commands at two seconds each turns into seventeen minutes of an engineer staring at a terminal. The instinct is to reach for threads, but spawning subprocesses is I/O-bound waiting, and that’s exactly the workload asyncio was built for. With asyncio.create_subprocess_exec and a Semaphore, you can fan out hundreds of commands, bound the concurrency so you don’t melt the box, capture every byte of output, and enforce timeouts, in about forty lines.
This is also a place where I let AI write the first pass and then read it like a hawk, because concurrent subprocess code has a specific set of bugs that look correct on the screen and deadlock in production. AI drafts, human verifies, and the things you’re verifying here are deadlocks, leaked processes, and silently swallowed timeouts.
The core primitive
asyncio.create_subprocess_exec launches a process without going through a shell, takes its arguments as a list (no quoting hazards), and returns a coroutine you can await. The cardinal rule: to read its output you use await proc.communicate(), never a manual loop over proc.stdout, because the latter deadlocks the moment a process fills the OS pipe buffer.
import asyncio
async def run(cmd: list[str]) -> tuple[int, bytes, bytes]:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return proc.returncode, stdout, stderr
async def main():
rc, out, err = await run(["uname", "-a"])
print(rc, out.decode().strip())
asyncio.run(main())
communicate() reads both pipes concurrently and waits for the process to exit, which is the only deadlock-free way to capture both streams. Using create_subprocess_exec rather than create_subprocess_shell means each argument is passed verbatim to execve, so a hostname containing a semicolon is just a weird hostname, not a command injection.
Bounding concurrency with a Semaphore
If you launch five hundred coroutines at once you will exhaust file descriptors, fork-bomb the host, or trip a rate limit. The fix is a Semaphore that caps how many subprocesses run at the same time. Acquire it before spawning, release it after, and the rest of the tasks queue politely:
import asyncio
async def run_bounded(sem: asyncio.Semaphore, cmd: list[str]):
async with sem:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return cmd, proc.returncode, stdout, stderr
async def fan_out(commands: list[list[str]], concurrency: int = 20):
sem = asyncio.Semaphore(concurrency)
tasks = [run_bounded(sem, c) for c in commands]
return await asyncio.gather(*tasks)
The async with sem block is the entire mechanism: at most concurrency coroutines hold the semaphore at once, and gather collects all the results in the order you submitted them. Twenty concurrent subprocesses is a sane default for most hosts; tune it to your file-descriptor limit and the downstream system’s tolerance. This is the same bounded-concurrency shape that the asyncio Semaphore review prompt digs into when you want a second set of eyes on the limits.
Per-command timeouts without leaking processes
A command that hangs will hold its slot in the semaphore forever, and on Python’s older APIs a timeout that doesn’t kill the underlying process leaves you with a zombie. The robust pattern wraps communicate() in asyncio.wait_for, and critically, kills the process in the except branch:
async def run_with_timeout(sem, cmd, timeout=10.0):
async with sem:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
return cmd, proc.returncode, stdout, stderr
except asyncio.TimeoutError:
proc.kill()
await proc.communicate() # reap the killed process
return cmd, None, b"", b"timeout"
The line people forget is proc.kill() followed by a second await proc.communicate(). Without the kill, the subprocess keeps running after your coroutine gives up on it. Without the second communicate(), you leave a zombie that the event loop will warn about on shutdown. This two-step cleanup is the most common thing an AI draft gets wrong, because the happy path looks complete without it.
What I asked the model, and what I had to fix: “Write an async Python function that runs a shell command with a timeout, captures stdout and stderr, and guarantees the process is terminated if it exceeds the timeout. Use
create_subprocess_exec.” The draft usedasyncio.wait_for(proc.communicate(), ...)correctly but, in theTimeoutErrorhandler, it returned immediately without callingproc.kill()or reaping the process. On a batch of 300 commands where a dozen timed out, that would have leaked a dozen processes and emitted a wall of “subprocess still running” warnings. Caught it only by reading the except block. The model’s structure was right; the cleanup was missing.
Collecting results without losing failures
asyncio.gather by default cancels every sibling task if one raises. For a fan-out you almost always want the opposite: collect everything, including the failures, and report. Pass return_exceptions=True, or, better, design each task to return a result tuple so exceptions never escape in the first place:
async def main(commands):
results = await fan_out(commands, concurrency=25)
failures = [(cmd, rc) for cmd, rc, out, err in results if rc not in (0,)]
print(f"{len(results) - len(failures)} ok, {len(failures)} failed")
for cmd, rc in failures:
print(f" FAILED rc={rc}: {' '.join(cmd)}")
Returning a structured tuple from each task, rather than letting it raise, is what keeps one bad host from sinking the whole batch. It also makes the results trivially sortable, groupable, and dumpable to JSON for the next stage of a pipeline.
When to use this versus xargs
Be honest about whether you need Python at all. If the job is “run this one command across a list with N in parallel and I just want the exit codes,” GNU xargs -P or parallel is fewer moving parts and nothing to maintain:
cat hosts.txt | xargs -P 20 -I{} ssh {} 'uptime'
The xargs parallel batch execution prompt covers that route well, and for a long time it’s all I used.
Reach for the asyncio version when the orchestration logic outgrows a shell one-liner: when you need per-command timeouts with guaranteed cleanup, structured result objects to feed downstream, retries with backoff on specific exit codes, or interleaving subprocess fan-out with async HTTP calls in the same event loop. That last case, mixing process spawns and network requests under one concurrency budget, is where Python pulls decisively ahead; the async HTTP poller prompt shows the networking half of that pattern.
The dividing line I use: if a maintainer would understand the xargs line in five seconds, ship the xargs line. The moment you’re tempted to add a second flag to handle timeouts or parse the output, you’ve outgrown it, and the forty lines of asyncio above will age better.
As always, the workflow is the same: let the model produce the scaffold, then read every line of the concurrency and cleanup code, because that’s where the bugs hide and the screen looks correct anyway. More patterns in the same spirit are collected in the Bash & Python automation category.
Download the Free 500-Prompt DevOps AI Toolkit
500 battle-tested, copy-paste AI prompts engineered by a senior systems engineer — every one with fill-in placeholders and safety/back-out notes. Drop your email and it's yours.
- 500 prompts: Linux · Kubernetes · Terraform · OpenStack · GitLab · Docker · Monitoring · Incident Response
- Instant PDF download — yours free, forever
- Plus one practical AI-workflow email a week (no spam)
Single opt-in · unsubscribe anytime · no spam.