Skip to content
DevOps AI ToolKit
Newsletter
All guides
AI for Bash & Python Automation By James Joyner IV · · 9 min read

Python asyncio for Ops: Checking 500 Endpoints in the Time It Takes to Check One

When your script spends all its time waiting on the network, asyncio turns a 10-minute job into a 5-second one. A practical asyncio guide for DevOps work.

  • #python
  • #bash
  • #asyncio
  • #concurrency
  • #http
  • #automation

Most ops scripts aren’t slow because the CPU is busy — they’re slow because they’re waiting. Waiting on an HTTP response, a DNS lookup, an SSH handshake. A health-check script that hits 500 endpoints one at a time spends 99.9% of its life idle, blocked on the network. asyncio is Python’s answer to that specific problem: while one request waits, run the others. Done right, a job that took ten minutes serially finishes in seconds.

This is the practical subset of asyncio I use for real operational work, plus the traps that bite everyone the first time.

When asyncio helps (and when it’s pointless)

Be clear-eyed about this: asyncio speeds up I/O-bound work — network calls, file reads, anything that waits. It does nothing for CPU-bound work like compression or number-crunching; in fact it’ll be slower because of the overhead. For CPU-bound parallelism you want multiprocessing. The mental test: if your task spends most of its time waiting on something external, asyncio is the right tool.

DevOps work is overwhelmingly I/O-bound — API calls, health checks, log shipping — which is exactly why asyncio pays off so well here.

The shape of an async script

The two keywords are async def (defines a coroutine) and await (yields control while waiting). Here’s a health checker using httpx, an async-capable HTTP client:

import asyncio
import httpx

async def check(client: httpx.AsyncClient, url: str) -> tuple[str, int | str]:
    try:
        resp = await client.get(url, timeout=5.0)
        return url, resp.status_code
    except Exception as e:
        return url, f"ERROR: {type(e).__name__}"

async def main(urls: list[str]) -> None:
    async with httpx.AsyncClient() as client:
        tasks = [check(client, u) for u in urls]
        results = await asyncio.gather(*tasks)
    for url, status in results:
        print(f"{status}\t{url}")

if __name__ == "__main__":
    urls = [line.strip() for line in open("endpoints.txt")]
    asyncio.run(urls and main(urls))

The engine is asyncio.gather(*tasks) — it launches every coroutine concurrently and waits for all of them. With 500 URLs, all 500 requests are in flight at once and the whole thing finishes in roughly the time of the slowest single request, not the sum.

The trap: unbounded concurrency

That elegant gather over 500 URLs has the exact same danger as xargs -P 0 — it fires all 500 requests simultaneously, which will rate-limit you, exhaust file descriptors, or knock over the target. The fix is a semaphore, asyncio’s concurrency limiter:

async def check(sem, client, url):
    async with sem:                      # at most N inside this block
        resp = await client.get(url, timeout=5.0)
        return url, resp.status_code

async def main(urls):
    sem = asyncio.Semaphore(20)          # cap at 20 concurrent
    async with httpx.AsyncClient() as client:
        tasks = [check(sem, client, u) for u in urls]
        return await asyncio.gather(*tasks)

The async with sem block lets only 20 coroutines through at a time; the rest wait their turn. This is the asyncio equivalent of -j 20 in GNU parallel, and it’s just as non-optional. I never write a gather over a large list without a semaphore — that lesson cost me a 429-storm once.

Don’t let one failure sink the batch

By default, if any coroutine in gather raises, gather raises and you lose every other result. For ops work you almost always want the opposite: collect every result, successes and failures alike. Use return_exceptions=True:

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        print(f"failed: {type(r).__name__}")
    else:
        print(r)

Now a single timeout doesn’t blow away the 499 checks that succeeded. Catching exceptions inside the coroutine (as in the first example) is the other valid approach and often cleaner — pick one and be consistent.

Timeouts: the thing that will hang your script

Async code without timeouts will hang forever on a black-hole connection, and asyncio.gather waits for all tasks. Always set a timeout. Per-request timeouts (the timeout=5.0 above) handle the common case; for wrapping a whole block, asyncio.timeout (Python 3.11+) is clean:

async with asyncio.timeout(30):
    results = await asyncio.gather(*tasks)

If the whole batch isn’t done in 30 seconds, it raises TimeoutError instead of hanging your cron job indefinitely. A hung script that never exits is worse than a failed one, because nothing alerts on it.

Mixing in blocking calls

Sooner or later you’ll need a library that isn’t async-aware — a database driver, subprocess, a legacy SDK. Calling it directly blocks the entire event loop, freezing every other coroutine. The escape hatch is asyncio.to_thread, which runs the blocking call in a thread pool:

result = await asyncio.to_thread(legacy_blocking_function, arg)

This is the bridge between the async world and the pile of synchronous libraries you already depend on. If you find yourself wrapping everything in to_thread, that’s a sign a plain thread pool (concurrent.futures.ThreadPoolExecutor) might be simpler than asyncio for your case — no shame in that.

A realistic pattern: progress + bounded concurrency

Putting it together, here’s the structure I actually ship — semaphore-bounded, failure-tolerant, with progress so a long run doesn’t look hung:

async def run(urls):
    sem = asyncio.Semaphore(20)
    done = 0
    async with httpx.AsyncClient() as client:
        async def one(u):
            nonlocal done
            async with sem:
                r = await check_one(client, u)
            done += 1
            print(f"\r{done}/{len(urls)}", end="", flush=True)
            return r
        return await asyncio.gather(*(one(u) for u in urls),
                                    return_exceptions=True)

When to reach for it — and when not to

Use asyncio when you have many independent I/O operations and you control the code end to end. Skip it when the shell already does the job in one line (a simple xargs fan-out beats writing async code), or when the work is CPU-bound (use multiprocessing), or when you have exactly one slow call (just call it). Async code is more to read and debug than synchronous code, so it should earn its place.

For the async patterns I lean on and the prompts I use to generate safe concurrent scripts, see the Bash & Python automation category and our prompt library.

Always cap concurrency with a semaphore and set explicit timeouts before pointing an async script at production endpoints or a rate-limited API.

Free download · 368-page PDF

Download the Free 500-Prompt DevOps AI Toolkit

500 battle-tested, copy-paste AI prompts engineered by a senior systems engineer — every one with fill-in placeholders and safety/back-out notes. Drop your email and it's yours.

  • 500 prompts: Linux · Kubernetes · Terraform · OpenStack · GitLab · Docker · Monitoring · Incident Response
  • Instant PDF download — yours free, forever
  • Plus one practical AI-workflow email a week (no spam)

Single opt-in · unsubscribe anytime · no spam.