from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20)
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
class Base(DeclarativeBase):
pass
Use a yield dependency to provide a session per request:
from fastapi import Depends
from sqlalchemy.orm import Session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users")
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
Rule of thumb: always use a yield dep for DB sessions — the finally block
guarantees db.close() runs even if the handler raises.
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from pydantic import BaseModel, ConfigDict
class Base(DeclarativeBase):
pass
# ORM model (maps to the DB table)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(200), unique=True, nullable=False)
# Pydantic output schema
class UserOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
name: str
email: str
from_attributes=True lets Pydantic read ORM attributes instead of dict keys.
Rule of thumb: keep ORM models and Pydantic schemas in separate files — they serve different purposes and change for different reasons.
Commit after all writes in the handler succeed; rollback on exception:
def get_db():
db = SessionLocal()
try:
yield db
except Exception:
db.rollback()
raise
finally:
db.close()
@app.post("/users")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit() # persist to DB
db.refresh(db_user) # reload auto-generated fields (id, created_at)
return UserOut.model_validate(db_user)
autocommit=False (the default) means writes are buffered until commit().
Reads don't need a commit.
Rule of thumb: commit as late as possible (after all writes succeed); never commit in a loop — batch all changes and commit once.
ORM-style queries (SQLAlchemy 2.x preferred):
from sqlalchemy import select
# Single object
user = db.get(User, user_id) # by primary key
# Filtered list
stmt = select(User).where(User.is_active == True)
users = db.scalars(stmt).all()
# Pagination
stmt = select(User).offset(skip).limit(limit)
users = db.scalars(stmt).all()
# Count
from sqlalchemy import func
count = db.scalar(select(func.count()).select_from(User))
Legacy query API (still works, being phased out):
user = db.query(User).filter(User.id == user_id).first()
Rule of thumb: prefer select() + db.scalars() (SQLAlchemy 2.x style) over
db.query() — it's the modern API and works with both sync and async.
Load related objects eagerly to avoid N+1 queries:
from sqlalchemy.orm import relationship, selectinload
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
posts = relationship("Post", back_populates="author")
# Eager load with selectinload
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
user = db.scalars(stmt).first()
Pydantic schema with nested relationship:
class PostOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
title: str
class UserWithPosts(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
name: str
posts: list[PostOut]
Rule of thumb: always eager-load relationships you plan to serialise —
lazy loading in a closed session causes DetachedInstanceError.
The N+1 problem occurs when you load N parent rows and then issue 1 query per parent to load its children — N+1 queries total.
# N+1 — 1 query for users + 1 per user for posts
users = db.scalars(select(User)).all()
for user in users:
print(user.posts) # lazy load triggers a new query each time!
Fix with eager loading:
# 2 queries total: one for users, one for all their posts
users = db.scalars(
select(User).options(selectinload(User.posts))
).all()
# Or joined load (single query with JOIN)
users = db.scalars(
select(User).options(joinedload(User.posts))
).all()
selectinload is better for one-to-many (avoids duplicated parent rows);
joinedload is better for many-to-one or when the child set is small.
Rule of thumb: any serialised response that includes a relationship must use
selectinload or joinedload — never rely on lazy loading in API handlers.
One session per request. The standard get_db yield dependency provides
exactly this — a session is created at the start of the request and closed
(and rolled back on error) at the end.
# ✅ One session per request
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Anti-patterns:
- Global session: shared across requests → race conditions.
- Session per DB call: loses transaction semantics; can't roll back multiple writes together.
- Thread-local session outside a request context: leaks sessions.
Rule of thumb: bind the session to the HTTP request lifetime via a yield dep — one request = one transaction = one session.
app/
├── db/
│ ├── base.py # DeclarativeBase
│ ├── session.py # engine, SessionLocal, get_db
│ └── models/
│ ├── user.py # SQLAlchemy ORM models
│ └── order.py
├── schemas/ # Pydantic models (input/output DTOs)
│ ├── user.py
│ └── order.py
├── crud/ # DB operations (no FastAPI imports)
│ ├── user.py
│ └── order.py
└── routers/
├── users.py # thin — calls CRUD, returns schema
└── orders.py
CRUD functions are plain Python:
# crud/user.py
def get_user(db: Session, user_id: int) -> User | None:
return db.get(User, user_id)
Rule of thumb: keep DB logic in crud/, keep HTTP logic in routers/ —
it makes CRUD functions unit-testable without standing up an ASGI server.
Each Uvicorn worker process gets its own SQLAlchemy engine and connection pool.
With 4 workers and pool_size=10, you have 40 total connections to the DB.
engine = create_engine(
DATABASE_URL,
pool_size=5, # connections kept open
max_overflow=10, # burst connections allowed
pool_timeout=30, # wait time before "too many connections" error
pool_recycle=1800, # recycle connections every 30 min (avoids stale)
)
For containers / Kubernetes where the worker count varies, use PgBouncer as a connection pooler in front of PostgreSQL to cap total connections regardless of pod count.
Rule of thumb: set pool_size conservatively (2-5 per worker); use PgBouncer
in transaction-pooling mode for high-concurrency deployments.
Call Base.metadata.create_all(engine) — typically in the lifespan function
or a startup script:
@asynccontextmanager
async def lifespan(app: FastAPI):
Base.metadata.create_all(bind=engine) # creates tables if they don't exist
yield
app = FastAPI(lifespan=lifespan)
For production, never use create_all at startup — use Alembic migrations
instead. create_all in production risks schema drift and doesn't handle alterations.
Rule of thumb: use create_all only for local dev and test databases; use
Alembic for staging and production schema changes.
Add a deleted_at timestamp column and filter it out in queries:
from datetime import datetime
from sqlalchemy import Column, DateTime
from sqlalchemy.sql import expression
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
deleted_at = Column(DateTime, nullable=True, default=None)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
# Query only active records
stmt = select(User).where(User.deleted_at.is_(None))
# Soft delete
user.deleted_at = datetime.utcnow()
db.commit()
Rule of thumb: always filter WHERE deleted_at IS NULL in all list queries —
add a SQLAlchemy event listener or a custom query class to enforce this globally.
More Database Integration interview questions
More ways to practice
The self-quiz is live. Get notified when mock interviews and new question packs drop.