
asyncio solve?When your program spends time waiting (for a network call, file, timer, etc.), the CPU sits idle. asyncio lets one task yield while it waits so another task can run, giving you concurrency in a single thread.
import asyncio
async def hello():
    await asyncio.sleep(1)      # pretend to wait on I/O
    return "Hello, asyncio!"
result = asyncio.run(hello())   # creates/runs the event loop for you
print(result)
What it’s doing:
async def defines a coroutine (a function that can pause/resume).await yields control back to the event loop while waiting (here: 1 second).asyncio.run() starts the loop, runs the coroutine, and closes the loop.import time, asyncio
# ----- synchronous -----
def sync_job(i):
    time.sleep(1)      # blocks entire thread
    return f"sync {i}"
def sync_main():
    start = time.time()
    results = [sync_job(i) for i in range(5)]
    print(results, f"{time.time()-start:.2f}s")
# ----- asynchronous -----
async def async_job(i):
    await asyncio.sleep(1)  # yields to the loop
    return f"async {i}"
async def async_main():
    start = time.time()
    results = await asyncio.gather(*(async_job(i) for i in range(5)))
    print(results, f"{time.time()-start:.2f}s")
if __name__ == "__main__":
    sync_main()             # ≈ 5 seconds
    asyncio.run(async_main())  # ≈ 1 second
What’s happening:
asyncio.gather and limiting concurrencyimport asyncio
import random
async def fetch(i):
    # Simulate a variable I/O wait (e.g., HTTP call)
    await asyncio.sleep(random.uniform(0.2, 1.0))
    return f"item-{i}"
async def worker(i, sem):
    async with sem:          # limit concurrent operations
        result = await fetch(i)
        print(f"done: {result}")
        return result
async def main():
    sem = asyncio.Semaphore(5)     # at most 5 concurrent fetches
    tasks = [worker(i, sem) for i in range(30)]
    results = await asyncio.gather(*tasks)
    print("Total:", len(results))
asyncio.run(main())
Why this matters: Real APIs will throttle/rate-limit you. A Semaphore keeps you respectful and stable.
asyncio.Queueimport asyncio
async def producer(q):
    for i in range(10):
        await q.put(f"task-{i}")   # enqueue work
    for _ in range(3):             # tell consumers to stop
        await q.put(None)
async def consumer(name, q):
    while True:
        item = await q.get()
        if item is None:
            break                  # shutdown signal
        await asyncio.sleep(0.3)   # simulate work
        print(f"{name} processed {item}")
        q.task_done()
async def main():
    q = asyncio.Queue()
    prod = asyncio.create_task(producer(q))
    consumers = [asyncio.create_task(consumer(f"C{i}", q)) for i in range(3)]
    await prod
    await q.join()                 # wait until queue is empty
    for c in consumers:
        await c                    # finish (after receiving None)
asyncio.run(main())
What it’s doing:
wait_forimport asyncio
async def slow():
    await asyncio.sleep(5)
async def main():
    try:
        await asyncio.wait_for(slow(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timed out!")
asyncio.run(main())
Cancelling a task safely
import asyncio
async def long_running():
    try:
        while True:
            await asyncio.sleep(0.5)
            print("working...")
    except asyncio.CancelledError:
        print("cancelling: cleaning up...")
        # close files/sockets here
        raise
async def main():
    t = asyncio.create_task(long_running())
    await asyncio.sleep(2)
    t.cancel()
    try:
        await t
    except asyncio.CancelledError:
        print("task cancelled")
asyncio.run(main())
Why it matters: Real systems must handle timeouts and shutdowns gracefully.
aiohttp) — real world I/OIf you can, install
aiohttp(pip install aiohttp). This shows true I/O concurrency.
import asyncio, aiohttp
URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
]
async def fetch(session, url):
    async with session.get(url) as resp:
        return url, resp.status
async def main():
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*(fetch(session, u) for u in URLS))
        for url, status in results:
            print(status, url)
asyncio.run(main())
What it’s doing:
ClientSession (connection pooling).If you must call a blocking function (e.g., CPU-bound or a non-async library), offload it:
import asyncio, time
def blocking_io(n):
    time.sleep(1)         # blocks the thread
    return n * n
async def main():
    loop = asyncio.get_running_loop()
    # Offload to a default ThreadPoolExecutor
    results = await asyncio.gather(
        loop.run_in_executor(None, blocking_io, 3),
        loop.run_in_executor(None, blocking_io, 4),
    )
    print(results)
asyncio.run(main())
Rule of thumb:
run_in_executor.time.sleep() in async code → blocks the loop.await asyncio.sleep().await on coroutines → you get a coroutine object, not a result.await or schedule with create_task.asyncio.run(main()) entry point.Semaphore or a bounded queue.Quick reference: structure your program
import asyncio
async def main():
    # 1) create tasks
    t1 = asyncio.create_task(coro1())
    t2 = asyncio.create_task(coro2())
    # 2) run other awaits while tasks progress
    result = await some_other_coro()
    # 3) gather results
    await asyncio.gather(t1, t2)
if __name__ == "__main__":
    asyncio.run(main())
async def) + await let your code pause while it waits.gather, Semaphore, Queue, timeouts, and cancellation to make robust async apps.