Skip to content

Async Database Interview Questions & Answers

11 questions Updated 2026-06-20 Share:

FastAPI async database interview questions — AsyncSession, async SQLAlchemy, asyncpg, aiosqlite and event loop compatibility.

11 of 11

Synchronous DB drivers block the OS thread during the query. In a FastAPI async def handler, this freezes the entire event loop — no other requests can be processed until the query returns.

# BAD — sync driver inside async handler blocks the event loop
@app.get("/users")
async def list_users(db: Session = Depends(get_db)):
    return db.query(User).all()   # blocks event loop for entire query duration

# GOOD — async driver yields control during the query
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

Async drivers (asyncpg, aiosqlite, aiomysql) suspend the coroutine during I/O, letting the event loop handle other requests.

Rule of thumb: always use async DB drivers in async def FastAPI handlers; use sync drivers only with def handlers (FastAPI runs them in a thread pool).

from sqlalchemy.ext.asyncio import (
    create_async_engine, AsyncSession, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase

# asyncpg driver for PostgreSQL
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"

engine = create_async_engine(DATABASE_URL, pool_size=10, echo=False)
async_session_factory = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

class Base(DeclarativeBase):
    pass

expire_on_commit=False prevents SQLAlchemy from expiring (lazily reloading) attributes after a commit — essential in async code where lazy loading would trigger a new I/O call without being awaited.

Rule of thumb: always set expire_on_commit=False for async sessions — expired attributes in async context cause MissingGreenlet or DetachedInstanceError.

from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends

async def get_async_db():
    async with async_session_factory() as session:
        yield session
    # async context manager commits on clean exit, rolls back on exception

# Or explicit try/except for fine-grained control:
async def get_async_db():
    session = AsyncSession(engine)
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

Rule of thumb: prefer async with async_session_factory() as session — it handles commit/rollback automatically and is harder to get wrong.

from sqlalchemy import select, func

# Single object by primary key
user = await db.get(User, user_id)

# Filtered query
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()

# Single result
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()   # None if not found, raises if multiple

# Count
result = await db.execute(select(func.count()).select_from(User))
count = result.scalar()

# Insert
db.add(User(name="Alice", email="alice@example.com"))
await db.commit()

# Update
await db.execute(update(User).where(User.id == id).values(name="Bob"))
await db.commit()

Rule of thumb: always await db.execute(stmt) for queries; always await db.commit() after writes — forgetting await on async methods causes silent failures.

Lazy loading is not available in async SQLAlchemy — accessing an unloaded relationship raises MissingGreenlet. You must eager-load:

from sqlalchemy.orm import selectinload, joinedload

# selectinload — runs a second SELECT IN query
result = await db.execute(
    select(User).options(selectinload(User.posts)).where(User.id == user_id)
)
user = result.scalar_one()
# user.posts is now loaded — no extra query needed

# joinedload — single JOIN query
result = await db.execute(
    select(Post).options(joinedload(Post.author))
)
posts = result.unique().scalars().all()

result.unique() is needed after joinedload to de-duplicate rows from the JOIN.

Rule of thumb: always use selectinload (one-to-many) or joinedload (many-to-one) in async; lazy loading raises an error rather than silently blocking.

Driver URL prefix Notes
asyncpg postgresql+asyncpg:// Fastest, pure async; most popular
psycopg3 (psycopg) postgresql+psycopg:// psycopg 3.x; sync + async
aiopg postgresql+aiopg:// Older, based on psycopg2
pip install asyncpg            # for asyncpg
pip install psycopg[binary]   # for psycopg3
# asyncpg
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")

# psycopg3
engine = create_async_engine("postgresql+psycopg://user:pass@host/db")

asyncpg is the most battle-tested; psycopg3 supports binary protocol and is gaining adoption.

Rule of thumb: use asyncpg for new projects — it's the most performant and has the widest ecosystem support.

Use the aiosqlite driver:

pip install aiosqlite
from sqlalchemy.ext.asyncio import create_async_engine

# In-memory for tests
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=True)

# File-based
engine = create_async_engine("sqlite+aiosqlite:///./test.db")

For tests, create tables in the test fixture:

@pytest.fixture
async def db_engine():
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    await engine.dispose()

Rule of thumb: use sqlite+aiosqlite:///:memory: in tests for fast, isolated DB fixtures — no cleanup needed since the DB disappears when the engine is disposed.

async_scoped_session binds sessions to an asyncio.Task (coroutine) context rather than a thread. It's the async equivalent of scoped_session.

import asyncio
from sqlalchemy.ext.asyncio import async_scoped_session

AsyncScopedSession = async_scoped_session(
    async_session_factory,
    scopefunc=asyncio.current_task,
)

With scopefunc=asyncio.current_task, each asyncio Task gets its own session. Call AsyncScopedSession.remove() to close and remove the session at the end of a task.

In FastAPI, the per-request get_async_db yield dep is simpler and preferred. async_scoped_session is useful in background workers or batch scripts where you don't have a FastAPI dependency container.

Rule of thumb: use async_scoped_session in standalone async scripts; use the yield dep pattern in FastAPI request handlers.

Use connection.run_sync() to execute sync SQLAlchemy operations on the async connection without blocking the event loop:

async with engine.begin() as conn:
    # run_sync calls sync code inside the driver's thread pool
    await conn.run_sync(Base.metadata.create_all)
    await conn.run_sync(Base.metadata.drop_all)

For executing sync SQLAlchemy session operations (bulk inserts with the legacy API):

async with AsyncSession(engine) as session:
    def _sync_bulk_insert(sync_session):
        sync_session.bulk_insert_mappings(User, data)
    await session.run_sync(_sync_bulk_insert)

Rule of thumb: use run_sync only for operations that have no async equivalent (Alembic migrations, legacy bulk operations); prefer the async API everywhere else.

Each Uvicorn worker process has its own event loop and connection pool. With 4 workers and pool_size=10, you have 40 connections total to the database.

asyncpg uses a native connection pool built on the event loop — connections are not thread-safe and must be used within the same event loop they were created on.

engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,         # connections per worker
    max_overflow=5,      # burst pool per worker
    pool_timeout=30,
    pool_recycle=3600,   # re-connect periodically
)

Pre-fork (Gunicorn) models must create the engine after forking — creating it before the fork shares connection file descriptors across workers, causing corruption.

Rule of thumb: create the async engine inside the lifespan function so each worker process creates its own after forking.

By default, AsyncSession runs in autobegin mode — a transaction starts automatically on the first operation. Use begin() for explicit transactions:

async with AsyncSession(engine) as session:
    async with session.begin():
        # all operations here are in one transaction
        session.add(User(name="Alice"))
        session.add(Order(user_id=1, total=99.99))
        # commit happens automatically when the context manager exits

# Or manually:
session = AsyncSession(engine)
try:
    session.add(user)
    session.add(order)
    await session.commit()
except Exception:
    await session.rollback()
    raise
finally:
    await session.close()

Rule of thumb: use async with session.begin() for atomic multi-step operations — it makes the transaction boundary explicit and ensures rollback on any exception.

More ways to practice

The self-quiz is live. Get notified when mock interviews and new question packs drop.

or
Join our WhatsApp Channel