asyncio
solve?When your program spends time waiting (for a network call, file, timer, etc.), the CPU sits idle. asyncio
lets one task yield while it waits so another task can run, giving you concurrency in a single thread.
import asyncio
async def hello():
await asyncio.sleep(1) # pretend to wait on I/O
return "Hello, asyncio!"
result = asyncio.run(hello()) # creates/runs the event loop for you
print(result)
What it’s doing:
async def
defines a coroutine (a function that can pause/resume).await
yields control back to the event loop while waiting (here: 1 second).asyncio.run()
starts the loop, runs the coroutine, and closes the loop.import time, asyncio
# ----- synchronous -----
def sync_job(i):
time.sleep(1) # blocks entire thread
return f"sync {i}"
def sync_main():
start = time.time()
results = [sync_job(i) for i in range(5)]
print(results, f"{time.time()-start:.2f}s")
# ----- asynchronous -----
async def async_job(i):
await asyncio.sleep(1) # yields to the loop
return f"async {i}"
async def async_main():
start = time.time()
results = await asyncio.gather(*(async_job(i) for i in range(5)))
print(results, f"{time.time()-start:.2f}s")
if __name__ == "__main__":
sync_main() # ≈ 5 seconds
asyncio.run(async_main()) # ≈ 1 second
What’s happening:
asyncio.gather
and limiting concurrencyimport asyncio
import random
async def fetch(i):
# Simulate a variable I/O wait (e.g., HTTP call)
await asyncio.sleep(random.uniform(0.2, 1.0))
return f"item-{i}"
async def worker(i, sem):
async with sem: # limit concurrent operations
result = await fetch(i)
print(f"done: {result}")
return result
async def main():
sem = asyncio.Semaphore(5) # at most 5 concurrent fetches
tasks = [worker(i, sem) for i in range(30)]
results = await asyncio.gather(*tasks)
print("Total:", len(results))
asyncio.run(main())
Why this matters: Real APIs will throttle/rate-limit you. A Semaphore
keeps you respectful and stable.
asyncio.Queue
import asyncio
async def producer(q):
for i in range(10):
await q.put(f"task-{i}") # enqueue work
for _ in range(3): # tell consumers to stop
await q.put(None)
async def consumer(name, q):
while True:
item = await q.get()
if item is None:
break # shutdown signal
await asyncio.sleep(0.3) # simulate work
print(f"{name} processed {item}")
q.task_done()
async def main():
q = asyncio.Queue()
prod = asyncio.create_task(producer(q))
consumers = [asyncio.create_task(consumer(f"C{i}", q)) for i in range(3)]
await prod
await q.join() # wait until queue is empty
for c in consumers:
await c # finish (after receiving None)
asyncio.run(main())
What it’s doing:
wait_for
import asyncio
async def slow():
await asyncio.sleep(5)
async def main():
try:
await asyncio.wait_for(slow(), timeout=1.0)
except asyncio.TimeoutError:
print("Timed out!")
asyncio.run(main())
Cancelling a task safely
import asyncio
async def long_running():
try:
while True:
await asyncio.sleep(0.5)
print("working...")
except asyncio.CancelledError:
print("cancelling: cleaning up...")
# close files/sockets here
raise
async def main():
t = asyncio.create_task(long_running())
await asyncio.sleep(2)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("task cancelled")
asyncio.run(main())
Why it matters: Real systems must handle timeouts and shutdowns gracefully.
aiohttp
) — real world I/OIf you can, install
aiohttp
(pip install aiohttp
). This shows true I/O concurrency.
import asyncio, aiohttp
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]
async def fetch(session, url):
async with session.get(url) as resp:
return url, resp.status
async def main():
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*(fetch(session, u) for u in URLS))
for url, status in results:
print(status, url)
asyncio.run(main())
What it’s doing:
ClientSession
(connection pooling).If you must call a blocking function (e.g., CPU-bound or a non-async library), offload it:
import asyncio, time
def blocking_io(n):
time.sleep(1) # blocks the thread
return n * n
async def main():
loop = asyncio.get_running_loop()
# Offload to a default ThreadPoolExecutor
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io, 3),
loop.run_in_executor(None, blocking_io, 4),
)
print(results)
asyncio.run(main())
Rule of thumb:
run_in_executor
.time.sleep()
in async code → blocks the loop.await asyncio.sleep()
.await
on coroutines → you get a coroutine object, not a result.await
or schedule with create_task
.asyncio.run(main())
entry point.Semaphore
or a bounded queue.Quick reference: structure your program
import asyncio
async def main():
# 1) create tasks
t1 = asyncio.create_task(coro1())
t2 = asyncio.create_task(coro2())
# 2) run other awaits while tasks progress
result = await some_other_coro()
# 3) gather results
await asyncio.gather(t1, t2)
if __name__ == "__main__":
asyncio.run(main())
async def
) + await
let your code pause while it waits.gather
, Semaphore
, Queue
, timeouts, and cancellation to make robust async apps.