async/await is Python's syntax for cooperative concurrency. An async def
function returns a coroutine object; await suspends that coroutine until the
awaited operation completes, letting the event loop run other coroutines in the
meantime. No thread switch happens — the same OS thread handles many requests.
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # yield control while "waiting"
return {"id": user_id}
FastAPI is built on top of Starlette and asyncio. When you declare a route
handler as async def, FastAPI runs it directly on the event loop, giving you
high concurrency without spawning new threads for each request.
Rule of thumb: use async def handlers whenever you await something (DB call,
HTTP request, cache); use def for pure CPU work that has no async alternative.
WSGI (Web Server Gateway Interface, PEP 3333) is the classic Python web
server standard. It assumes a synchronous, request-per-thread model —
a server calls app(environ, start_response) and blocks until the response is
returned. Frameworks like Flask and Django (pre-4.x) use WSGI.
ASGI (Asynchronous Server Gateway Interface) extends WSGI to support
async handlers, WebSockets and long-polling in the same interface. A server
calls await app(scope, receive, send) and the app may await I/O freely.
# Minimal ASGI app (what Starlette/FastAPI is under the hood)
async def app(scope, receive, send):
if scope["type"] == "http":
await send({"type": "http.response.start", "status": 200, "headers": []})
await send({"type": "http.response.body", "body": b"Hello"})
FastAPI is an ASGI framework served by Uvicorn (or Hypercorn). ASGI lets a single process handle thousands of simultaneous connections without one thread per connection.
Rule of thumb: WSGI = sync/threads; ASGI = async/event-loop; FastAPI requires ASGI.
The event loop is a scheduler that runs coroutines, handles I/O callbacks
and timers — all in a single thread. When a coroutine awaits an I/O operation,
the loop suspends it and runs the next ready coroutine.
import asyncio
async def task_a():
print("A start")
await asyncio.sleep(1) # loop runs task_b here
print("A done")
async def task_b():
print("B start")
await asyncio.sleep(0.5)
print("B done")
asyncio.run(asyncio.gather(task_a(), task_b()))
# prints: A start, B start, B done, A done
Uvicorn creates one event loop per worker process and passes every incoming HTTP
request into it. FastAPI schedules your async def handler on that loop. If you
block the loop (e.g., call time.sleep() or a synchronous DB driver), all
requests stall until the block clears.
Rule of thumb: never call blocking code directly from an async def handler —
use await asyncio.to_thread() or a thread pool executor instead.
A coroutine is created by an async def function. Calling it does not
execute the body — it returns a coroutine object. The body only runs when you
await it (or pass it to asyncio.run()).
def regular():
return 42 # runs immediately on call
async def coro():
return 42 # returns <coroutine object> on call
result = regular() # 42
obj = coro() # <coroutine object coro at 0x...> — nothing ran yet
result = await coro() # 42 — body now runs
FastAPI recognises async def route handlers and schedules them with await
internally. If you declare a route as def, FastAPI runs it in a thread pool
to avoid blocking the event loop.
Rule of thumb: a coroutine is a pauseable function; without await (or
asyncio.run), its body never executes.
| Handler | When to use | FastAPI runs it in |
|---|---|---|
async def |
calls await-able I/O (DB, HTTP, cache) |
event loop directly |
def |
CPU work or sync libraries (Pandas, Pillow) | thread-pool executor |
# async — awaits the DB; loop stays free
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, id)
# def — pandas is sync-only; FastAPI offloads to a thread
@app.post("/report")
def generate_report(payload: ReportRequest):
df = pd.read_csv(payload.path) # blocking I/O, but in a thread
return df.describe().to_dict()
The wrong choice either wastes threads (async def + sync library that blocks)
or adds unnecessary thread overhead (def + code that could have been awaited).
Rule of thumb: if you await anything inside the handler, declare it async def;
if the handler is pure CPU/sync, use def.
Blocking the event loop means calling any operation that occupies the thread
without yielding: time.sleep(), synchronous file reads, requests.get(), heavy
computation. While blocked, no other coroutine runs — all concurrent requests
stall until the block clears.
# BAD — blocks the event loop for every request
@app.get("/slow")
async def slow():
time.sleep(2) # freezes ALL requests for 2 s
return {"ok": True}
# GOOD — moves the blocking call to a thread
@app.get("/slow")
async def slow():
await asyncio.to_thread(time.sleep, 2) # only this coroutine pauses
return {"ok": True}
Other fixes:
- Switch to an async library (
httpxinstead ofrequests,asyncpginstead ofpsycopg2). - Use
loop.run_in_executor(None, blocking_fn)(older equivalent ofasyncio.to_thread). - Declare the handler as plain
def— FastAPI automatically runs it in a thread pool.
Rule of thumb: if you can't avoid a blocking call inside async def, use
asyncio.to_thread() to push it off the event loop.
asyncio.gather(*coros) runs multiple coroutines concurrently on the same
event loop and returns their results as a list in the same order as the inputs.
It's the idiomatic way to fan-out I/O work within a single handler.
@app.get("/dashboard/{user_id}")
async def dashboard(user_id: int):
# fire both DB queries at the same time instead of sequentially
user, orders = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
)
return {"user": user, "orders": orders}
Sequential awaits would take t_user + t_orders; gather takes max(t_user, t_orders).
Rule of thumb: use asyncio.gather() whenever two or more I/O calls are
independent — it cuts latency to the slowest one instead of the sum.
anyio is a compatibility shim that lets library authors write async code once and run it on asyncio or Trio without changes. Starlette (FastAPI's foundation) adopted anyio as its async backend in v0.20+.
import anyio
async def run_parallel():
async with anyio.create_task_group() as tg:
tg.start_soon(task_a)
tg.start_soon(task_b)
In practice for FastAPI users:
- You still write plain
asynciocode — anyio is transparent. pytest-anyio/anyio.from_thread.run_syncbecome relevant when testing.- Starlette's
BackgroundTaskandlifespanuse anyio task groups internally.
Rule of thumb: you rarely call anyio directly in application code; knowing it
exists explains why pytest-anyio is the recommended test runner for Starlette apps.
Use asyncio.to_thread() (Python 3.9+) to run a sync function in the default
thread-pool executor while keeping the event loop free.
import asyncio
def sync_heavy(n: int) -> int:
# CPU or blocking I/O, e.g. reading a large file
return sum(range(n))
@app.get("/compute")
async def compute(n: int):
result = await asyncio.to_thread(sync_heavy, n)
return {"result": result}
For older Python (3.7–3.8) use loop.run_in_executor(None, fn, *args).
For FastAPI specifically, declaring the route as plain def achieves the same
thing automatically — FastAPI calls run_in_executor for you.
Rule of thumb: prefer def route handlers for wholly-sync work; use
asyncio.to_thread() only when you're already in an async def and need
one sync call inside otherwise-async logic.
requests is synchronous — calling requests.get() blocks the OS thread
until the response arrives. In an async def handler that means the event loop
is frozen for the duration of every outbound HTTP call.
httpx provides an identical API plus an AsyncClient that integrates with
asyncio:
import httpx
@app.get("/proxy")
async def proxy(url: str):
async with httpx.AsyncClient() as client:
resp = await client.get(url) # loop stays free
return resp.json()
httpx also supports HTTP/2, connection pooling via a shared client, and
httpx.AsyncClient as a test transport for FastAPI's TestClient.
Rule of thumb: always use httpx.AsyncClient for outbound HTTP inside FastAPI
async handlers; use requests only in plain def routes or CLI scripts.
Starlette is a lightweight ASGI framework/toolkit that provides routing,
middleware, WebSockets, background tasks and static files. FastAPI is built
directly on top of Starlette — every FastAPI() instance is a Starlette app.
FastAPI's additions:
- Type-hint-driven parameter parsing — path, query, body extracted via annotations.
- Pydantic v2 validation — automatic request validation and serialization.
- Dependency injection via
Depends(). - Automatic OpenAPI / JSON Schema generation (Swagger UI, ReDoc).
HTTPExceptionhelpers and response model enforcement.
from fastapi import FastAPI
from starlette.applications import Starlette # same base class
Rule of thumb: think of FastAPI as Starlette + Pydantic + OpenAPI; you can use all Starlette primitives (middleware, WebSockets, mounts) in a FastAPI app.
- Uvicorn (ASGI server) runs one event loop per worker process.
- A new HTTP connection arrives; Uvicorn's event loop accepts it and parses headers.
- FastAPI's router matches the URL and calls the handler.
- If the handler is
async def→ FastAPIawaits it on the event loop. If the handler isdef→ FastAPI submits it toanyio's thread pool andawaits the future, keeping the event loop free. - The handler performs I/O (e.g.,
await db.execute()). The coroutine suspends and the loop picks up the next ready coroutine. - When I/O completes, the handler resumes and returns a response dict.
- FastAPI serializes it (Pydantic
model_dump→ JSON) and sends bytes back through Uvicorn.
request → Uvicorn → FastAPI router → async handler (event loop)
or sync handler (thread pool)
→ Pydantic serialize → response
Rule of thumb: one worker process = one event loop = many concurrent requests, but only one coroutine actually executes CPU instructions at a time.
asyncio.create_task(coro) schedules a coroutine to run concurrently on the
event loop and returns a Task object immediately. The task starts running as
soon as the current coroutine yields. await coro runs the coroutine
sequentially — the caller suspends until it finishes.
async def handler():
# sequential: total time = t_a + t_b
result_a = await slow_query_a()
result_b = await slow_query_b()
# concurrent: total time = max(t_a, t_b)
task_a = asyncio.create_task(slow_query_a())
task_b = asyncio.create_task(slow_query_b())
result_a = await task_a
result_b = await task_b
asyncio.gather(*coros) is usually more convenient than manual create_task
when you want results; create_task is useful when you want to start a task
and do other work before collecting its result.
Rule of thumb: create_task is fire-and-partially-forget; use gather when you
need all results in one call.
FastAPI delegates sync (def) handlers to anyio's default thread limiter,
which caps concurrent threads at 40 by default (per worker process). This
prevents unbounded thread creation from swamping the OS.
import anyio
# Check or raise the cap at startup
@app.on_event("startup")
async def configure_threads():
limiter = anyio.from_thread.current_default_thread_limiter()
limiter.total_tokens = 20 # reduce if each thread uses lots of memory
The 40-thread default is conservative for most workloads. If your sync handlers
do fast CPU work, 40 is plenty. If they block on slow external calls, consider
using async def + asyncio.to_thread() instead so you control the pool.
Rule of thumb: for heavily sync-bound FastAPI apps, tune the anyio thread limit or, better, switch to async libraries so you don't need threads at all.
@app.on_event("startup/shutdown") decorators are the legacy approach —
they work but are deprecated in recent FastAPI. The modern replacement is the
lifespan context manager, which groups startup and shutdown in one function
using yield.
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
await db_pool.connect()
yield
# shutdown
await db_pool.disconnect()
app = FastAPI(lifespan=lifespan)
Benefits of lifespan:
- Single function instead of two decorators.
- Exception in startup cleanly skips the yield and shutdown block won't be reached.
- Composable — you can call other async context managers inside.
- Testable with
@asynccontextmanagerdirectly.
Rule of thumb: use lifespan= for all new FastAPI apps; avoid @app.on_event.
More Fundamentals interview questions
More ways to practice
The self-quiz is live. Get notified when mock interviews and new question packs drop.