lifespan is an async context manager function that runs startup code
before yield and shutdown code after it. It was introduced to replace
the older @app.on_event("startup") / @app.on_event("shutdown") decorators,
which are deprecated.
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
await db_pool.connect()
print("DB connected")
yield
# shutdown
await db_pool.disconnect()
print("DB disconnected")
app = FastAPI(lifespan=lifespan)
Rule of thumb: always use lifespan= for new apps — it's the modern approach
and groups startup/shutdown logic in one readable function.
The decorator approach had two problems:
- Startup and shutdown logic is split across two separate functions — harder to read the resource lifecycle.
- You can't use
async withoryield-based context managers in event handlers.
The lifespan context manager fixes both:
# Old (deprecated)
@app.on_event("startup")
async def startup():
await cache.connect()
@app.on_event("shutdown")
async def shutdown():
await cache.disconnect()
# New (preferred)
@asynccontextmanager
async def lifespan(app: FastAPI):
await cache.connect()
yield
await cache.disconnect()
app = FastAPI(lifespan=lifespan)
Rule of thumb: migrate @app.on_event to lifespan in any new or refactored
codebase — it's just moving the two functions into a context manager.
Store them on app.state inside the lifespan function. app.state is a
Starlette State object — a simple namespace you can attach anything to.
import httpx
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.http_client = httpx.AsyncClient()
yield
await app.state.http_client.aclose()
app = FastAPI(lifespan=lifespan)
@app.get("/proxy")
async def proxy(request: Request):
client = request.app.state.http_client
resp = await client.get("https://api.example.com/data")
return resp.json()
Rule of thumb: use app.state for truly shared singletons (connection pools,
loaded ML models, config); use Depends() for per-request resources.
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
engine = create_async_engine(settings.database_url, pool_size=10)
app.state.db_factory = async_sessionmaker(engine, expire_on_commit=False)
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
# Dependency that uses the shared engine
async def get_db(request: Request):
async with request.app.state.db_factory() as session:
yield session
The engine (and its connection pool) is created once at startup and disposed gracefully at shutdown.
Rule of thumb: create the engine in lifespan, not at module level — it lets
you inject different engines in tests via lifespan override.
Nest asynccontextmanager functions inside the main lifespan:
from contextlib import asynccontextmanager
@asynccontextmanager
async def db_lifespan(app):
engine = create_async_engine(settings.db_url)
app.state.engine = engine
yield
await engine.dispose()
@asynccontextmanager
async def cache_lifespan(app):
app.state.redis = Redis.from_url(settings.redis_url)
yield
await app.state.redis.aclose()
@asynccontextmanager
async def lifespan(app: FastAPI):
async with db_lifespan(app):
async with cache_lifespan(app):
yield
app = FastAPI(lifespan=lifespan)
Each sub-lifespan owns its resource; teardown runs in reverse nesting order.
Rule of thumb: one context manager per resource — compose them in the main
lifespan rather than putting everything into one giant function.
Three ways, depending on context:
In a dependency — inject Request:
def get_client(request: Request) -> httpx.AsyncClient:
return request.app.state.http_client
In middleware — request.app.state is available the same way:
async def dispatch(self, request: Request, call_next):
client = request.app.state.http_client
...
At module level — capture the app reference directly (only if in the
same module):
client = app.state.http_client # only works after lifespan startup
Rule of thumb: always access app.state through request.app in deps and
middleware — it avoids circular imports and works correctly in tests with
TestClient.
TestClient runs the full lifespan when used as a context manager:
from fastapi.testclient import TestClient
from app.main import app
def test_lifespan_sets_state():
with TestClient(app) as client:
# startup has run; app.state is populated
resp = client.get("/health")
assert resp.status_code == 200
# shutdown has run; resources are closed
For async tests with httpx.AsyncClient:
from httpx import AsyncClient, ASGITransport
import pytest
@pytest.mark.anyio
async def test_endpoint():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
resp = await client.get("/items")
assert resp.status_code == 200
Rule of thumb: always use TestClient as a context manager (with TestClient(app))
so startup and shutdown run — a non-context-manager TestClient skips them.
If an exception propagates out of the code before yield, FastAPI never starts
accepting requests — the application exits. The teardown code (after yield)
does not run because yield was never reached.
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
await db.connect() # if this raises, app won't start
except ConnectionError as e:
log.critical("DB unavailable: %s", e)
raise # startup fails, process exits
yield
await db.disconnect() # never reached if startup failed
For resources that you want to clean up even if a later startup step fails,
nest the try/finally around each step individually.
Rule of thumb: let startup exceptions propagate — a crashed startup is better than a running app with a broken DB connection.
No — APIRouter does not support lifespan. Only the root FastAPI instance
accepts a lifespan function. To organise per-module startup logic, use helper
context managers that the root lifespan composes:
# routers/users.py
@asynccontextmanager
async def users_lifespan(app):
app.state.user_cache = {}
yield
app.state.user_cache.clear()
# main.py
@asynccontextmanager
async def lifespan(app: FastAPI):
async with users_lifespan(app):
async with orders_lifespan(app):
yield
app = FastAPI(lifespan=lifespan)
Rule of thumb: keep the root lifespan as a compositor; define per-module
startup as asynccontextmanager functions and import them into main.
Use anyio.create_task_group() or asyncio.create_task() inside the lifespan:
import asyncio
from contextlib import asynccontextmanager
async def poll_metrics():
while True:
await push_metrics_to_monitoring()
await asyncio.sleep(60)
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(poll_metrics())
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
Always cancel and await the task on shutdown — leaving it running causes "Task was destroyed but it is pending!" warnings in tests.
Rule of thumb: cancel background tasks explicitly in the shutdown phase;
store the Task handle on app.state if other parts of the app need to check it.
request.state is a per-request State object — separate for every HTTP
request, discarded after the response is sent. app.state lives for the
entire application lifetime.
# Middleware sets per-request data
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request.state.request_id = str(uuid4())
return await call_next(request)
# Handler or dependency reads it
@app.get("/items")
async def list_items(request: Request):
rid = request.state.request_id
return {"request_id": rid}
Rule of thumb: use request.state for per-request metadata (trace IDs, auth
context set by middleware); use app.state for shared singletons.
More Dependency Injection interview questions
More ways to practice
The self-quiz is live. Get notified when mock interviews and new question packs drop.