Skip to content

SQLAlchemy (Sync) Interview Questions & Answers

11 questions Updated 2026-06-20 Share:

FastAPI SQLAlchemy interview questions — sync session setup, get_db dependency, ORM models, querying, relationships and Pydantic integration.

11 of 11
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

DATABASE_URL = "postgresql://user:pass@localhost/mydb"

engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20)
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)

class Base(DeclarativeBase):
    pass

Use a yield dependency to provide a session per request:

from fastapi import Depends
from sqlalchemy.orm import Session

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users")
def list_users(db: Session = Depends(get_db)):
    return db.query(User).all()

Rule of thumb: always use a yield dep for DB sessions — the finally block guarantees db.close() runs even if the handler raises.

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from pydantic import BaseModel, ConfigDict

class Base(DeclarativeBase):
    pass

# ORM model (maps to the DB table)
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    email = Column(String(200), unique=True, nullable=False)

# Pydantic output schema
class UserOut(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    name: str
    email: str

from_attributes=True lets Pydantic read ORM attributes instead of dict keys.

Rule of thumb: keep ORM models and Pydantic schemas in separate files — they serve different purposes and change for different reasons.

Commit after all writes in the handler succeed; rollback on exception:

def get_db():
    db = SessionLocal()
    try:
        yield db
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()

@app.post("/users")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = User(name=user.name, email=user.email)
    db.add(db_user)
    db.commit()           # persist to DB
    db.refresh(db_user)   # reload auto-generated fields (id, created_at)
    return UserOut.model_validate(db_user)

autocommit=False (the default) means writes are buffered until commit(). Reads don't need a commit.

Rule of thumb: commit as late as possible (after all writes succeed); never commit in a loop — batch all changes and commit once.

ORM-style queries (SQLAlchemy 2.x preferred):

from sqlalchemy import select

# Single object
user = db.get(User, user_id)                       # by primary key

# Filtered list
stmt = select(User).where(User.is_active == True)
users = db.scalars(stmt).all()

# Pagination
stmt = select(User).offset(skip).limit(limit)
users = db.scalars(stmt).all()

# Count
from sqlalchemy import func
count = db.scalar(select(func.count()).select_from(User))

Legacy query API (still works, being phased out):

user = db.query(User).filter(User.id == user_id).first()

Rule of thumb: prefer select() + db.scalars() (SQLAlchemy 2.x style) over db.query() — it's the modern API and works with both sync and async.

Load related objects eagerly to avoid N+1 queries:

from sqlalchemy.orm import relationship, selectinload

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    posts = relationship("Post", back_populates="author")

# Eager load with selectinload
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
user = db.scalars(stmt).first()

Pydantic schema with nested relationship:

class PostOut(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    title: str

class UserWithPosts(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    name: str
    posts: list[PostOut]

Rule of thumb: always eager-load relationships you plan to serialise — lazy loading in a closed session causes DetachedInstanceError.

The N+1 problem occurs when you load N parent rows and then issue 1 query per parent to load its children — N+1 queries total.

# N+1 — 1 query for users + 1 per user for posts
users = db.scalars(select(User)).all()
for user in users:
    print(user.posts)   # lazy load triggers a new query each time!

Fix with eager loading:

# 2 queries total: one for users, one for all their posts
users = db.scalars(
    select(User).options(selectinload(User.posts))
).all()

# Or joined load (single query with JOIN)
users = db.scalars(
    select(User).options(joinedload(User.posts))
).all()

selectinload is better for one-to-many (avoids duplicated parent rows); joinedload is better for many-to-one or when the child set is small.

Rule of thumb: any serialised response that includes a relationship must use selectinload or joinedload — never rely on lazy loading in API handlers.

One session per request. The standard get_db yield dependency provides exactly this — a session is created at the start of the request and closed (and rolled back on error) at the end.

# ✅ One session per request
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Anti-patterns:

  • Global session: shared across requests → race conditions.
  • Session per DB call: loses transaction semantics; can't roll back multiple writes together.
  • Thread-local session outside a request context: leaks sessions.

Rule of thumb: bind the session to the HTTP request lifetime via a yield dep — one request = one transaction = one session.

app/
├── db/
│   ├── base.py          # DeclarativeBase
│   ├── session.py       # engine, SessionLocal, get_db
│   └── models/
│       ├── user.py      # SQLAlchemy ORM models
│       └── order.py
├── schemas/             # Pydantic models (input/output DTOs)
│   ├── user.py
│   └── order.py
├── crud/                # DB operations (no FastAPI imports)
│   ├── user.py
│   └── order.py
└── routers/
    ├── users.py         # thin — calls CRUD, returns schema
    └── orders.py

CRUD functions are plain Python:

# crud/user.py
def get_user(db: Session, user_id: int) -> User | None:
    return db.get(User, user_id)

Rule of thumb: keep DB logic in crud/, keep HTTP logic in routers/ — it makes CRUD functions unit-testable without standing up an ASGI server.

Each Uvicorn worker process gets its own SQLAlchemy engine and connection pool. With 4 workers and pool_size=10, you have 40 total connections to the DB.

engine = create_engine(
    DATABASE_URL,
    pool_size=5,        # connections kept open
    max_overflow=10,    # burst connections allowed
    pool_timeout=30,    # wait time before "too many connections" error
    pool_recycle=1800,  # recycle connections every 30 min (avoids stale)
)

For containers / Kubernetes where the worker count varies, use PgBouncer as a connection pooler in front of PostgreSQL to cap total connections regardless of pod count.

Rule of thumb: set pool_size conservatively (2-5 per worker); use PgBouncer in transaction-pooling mode for high-concurrency deployments.

Call Base.metadata.create_all(engine) — typically in the lifespan function or a startup script:

@asynccontextmanager
async def lifespan(app: FastAPI):
    Base.metadata.create_all(bind=engine)   # creates tables if they don't exist
    yield

app = FastAPI(lifespan=lifespan)

For production, never use create_all at startup — use Alembic migrations instead. create_all in production risks schema drift and doesn't handle alterations.

Rule of thumb: use create_all only for local dev and test databases; use Alembic for staging and production schema changes.

Add a deleted_at timestamp column and filter it out in queries:

from datetime import datetime
from sqlalchemy import Column, DateTime
from sqlalchemy.sql import expression

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    deleted_at = Column(DateTime, nullable=True, default=None)

    @property
    def is_deleted(self) -> bool:
        return self.deleted_at is not None

# Query only active records
stmt = select(User).where(User.deleted_at.is_(None))

# Soft delete
user.deleted_at = datetime.utcnow()
db.commit()

Rule of thumb: always filter WHERE deleted_at IS NULL in all list queries — add a SQLAlchemy event listener or a custom query class to enforce this globally.

More ways to practice

The self-quiz is live. Get notified when mock interviews and new question packs drop.

or
Join our WhatsApp Channel