Synchronous DB drivers block the OS thread during the query. In a FastAPI
async def handler, this freezes the entire event loop — no other requests
can be processed until the query returns.
# BAD — sync driver inside async handler blocks the event loop
@app.get("/users")
async def list_users(db: Session = Depends(get_db)):
return db.query(User).all() # blocks event loop for entire query duration
# GOOD — async driver yields control during the query
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User))
return result.scalars().all()
Async drivers (asyncpg, aiosqlite, aiomysql) suspend the coroutine during
I/O, letting the event loop handle other requests.
Rule of thumb: always use async DB drivers in async def FastAPI handlers;
use sync drivers only with def handlers (FastAPI runs them in a thread pool).
from sqlalchemy.ext.asyncio import (
create_async_engine, AsyncSession, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase
# asyncpg driver for PostgreSQL
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(DATABASE_URL, pool_size=10, echo=False)
async_session_factory = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
class Base(DeclarativeBase):
pass
expire_on_commit=False prevents SQLAlchemy from expiring (lazily reloading)
attributes after a commit — essential in async code where lazy loading would
trigger a new I/O call without being awaited.
Rule of thumb: always set expire_on_commit=False for async sessions — expired
attributes in async context cause MissingGreenlet or DetachedInstanceError.
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends
async def get_async_db():
async with async_session_factory() as session:
yield session
# async context manager commits on clean exit, rolls back on exception
# Or explicit try/except for fine-grained control:
async def get_async_db():
session = AsyncSession(engine)
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User))
return result.scalars().all()
Rule of thumb: prefer async with async_session_factory() as session — it
handles commit/rollback automatically and is harder to get wrong.
from sqlalchemy import select, func
# Single object by primary key
user = await db.get(User, user_id)
# Filtered query
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
# Single result
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none() # None if not found, raises if multiple
# Count
result = await db.execute(select(func.count()).select_from(User))
count = result.scalar()
# Insert
db.add(User(name="Alice", email="alice@example.com"))
await db.commit()
# Update
await db.execute(update(User).where(User.id == id).values(name="Bob"))
await db.commit()
Rule of thumb: always await db.execute(stmt) for queries; always await db.commit()
after writes — forgetting await on async methods causes silent failures.
Lazy loading is not available in async SQLAlchemy — accessing an unloaded
relationship raises MissingGreenlet. You must eager-load:
from sqlalchemy.orm import selectinload, joinedload
# selectinload — runs a second SELECT IN query
result = await db.execute(
select(User).options(selectinload(User.posts)).where(User.id == user_id)
)
user = result.scalar_one()
# user.posts is now loaded — no extra query needed
# joinedload — single JOIN query
result = await db.execute(
select(Post).options(joinedload(Post.author))
)
posts = result.unique().scalars().all()
result.unique() is needed after joinedload to de-duplicate rows from the JOIN.
Rule of thumb: always use selectinload (one-to-many) or joinedload (many-to-one)
in async; lazy loading raises an error rather than silently blocking.
| Driver | URL prefix | Notes |
|---|---|---|
asyncpg |
postgresql+asyncpg:// |
Fastest, pure async; most popular |
psycopg3 (psycopg) |
postgresql+psycopg:// |
psycopg 3.x; sync + async |
aiopg |
postgresql+aiopg:// |
Older, based on psycopg2 |
pip install asyncpg # for asyncpg
pip install psycopg[binary] # for psycopg3
# asyncpg
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
# psycopg3
engine = create_async_engine("postgresql+psycopg://user:pass@host/db")
asyncpg is the most battle-tested; psycopg3 supports binary protocol and
is gaining adoption.
Rule of thumb: use asyncpg for new projects — it's the most performant and
has the widest ecosystem support.
Use the aiosqlite driver:
pip install aiosqlite
from sqlalchemy.ext.asyncio import create_async_engine
# In-memory for tests
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=True)
# File-based
engine = create_async_engine("sqlite+aiosqlite:///./test.db")
For tests, create tables in the test fixture:
@pytest.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
Rule of thumb: use sqlite+aiosqlite:///:memory: in tests for fast, isolated
DB fixtures — no cleanup needed since the DB disappears when the engine is disposed.
async_scoped_session binds sessions to an asyncio.Task (coroutine) context
rather than a thread. It's the async equivalent of scoped_session.
import asyncio
from sqlalchemy.ext.asyncio import async_scoped_session
AsyncScopedSession = async_scoped_session(
async_session_factory,
scopefunc=asyncio.current_task,
)
With scopefunc=asyncio.current_task, each asyncio Task gets its own session.
Call AsyncScopedSession.remove() to close and remove the session at the end
of a task.
In FastAPI, the per-request get_async_db yield dep is simpler and preferred.
async_scoped_session is useful in background workers or batch scripts where
you don't have a FastAPI dependency container.
Rule of thumb: use async_scoped_session in standalone async scripts; use the
yield dep pattern in FastAPI request handlers.
Use connection.run_sync() to execute sync SQLAlchemy operations on the async
connection without blocking the event loop:
async with engine.begin() as conn:
# run_sync calls sync code inside the driver's thread pool
await conn.run_sync(Base.metadata.create_all)
await conn.run_sync(Base.metadata.drop_all)
For executing sync SQLAlchemy session operations (bulk inserts with the legacy API):
async with AsyncSession(engine) as session:
def _sync_bulk_insert(sync_session):
sync_session.bulk_insert_mappings(User, data)
await session.run_sync(_sync_bulk_insert)
Rule of thumb: use run_sync only for operations that have no async equivalent
(Alembic migrations, legacy bulk operations); prefer the async API everywhere else.
Each Uvicorn worker process has its own event loop and connection pool. With
4 workers and pool_size=10, you have 40 connections total to the database.
asyncpg uses a native connection pool built on the event loop — connections
are not thread-safe and must be used within the same event loop they were created on.
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # connections per worker
max_overflow=5, # burst pool per worker
pool_timeout=30,
pool_recycle=3600, # re-connect periodically
)
Pre-fork (Gunicorn) models must create the engine after forking — creating it before the fork shares connection file descriptors across workers, causing corruption.
Rule of thumb: create the async engine inside the lifespan function so each
worker process creates its own after forking.
By default, AsyncSession runs in autobegin mode — a transaction starts
automatically on the first operation. Use begin() for explicit transactions:
async with AsyncSession(engine) as session:
async with session.begin():
# all operations here are in one transaction
session.add(User(name="Alice"))
session.add(Order(user_id=1, total=99.99))
# commit happens automatically when the context manager exits
# Or manually:
session = AsyncSession(engine)
try:
session.add(user)
session.add(order)
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Rule of thumb: use async with session.begin() for atomic multi-step operations —
it makes the transaction boundary explicit and ensures rollback on any exception.
More Database Integration interview questions
More ways to practice
The self-quiz is live. Get notified when mock interviews and new question packs drop.