BackgroundTasks lets you schedule functions to run after the HTTP response
is sent to the client. The handler returns immediately; the task runs in the
same process after the response is fully written.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(message + "\n")
@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
result = await db.save(item)
background_tasks.add_task(write_log, f"Created item {result.id}")
return result # response sent; write_log runs after
Rule of thumb: inject BackgroundTasks as a parameter — FastAPI provides it
automatically; never instantiate it yourself.
Yes — add_task accepts both async def and regular def functions.
async def send_welcome_email(email: str):
await smtp_client.send(email, "Welcome!")
def sync_audit_log(event: str):
logger.info(event) # sync — runs in thread pool
@app.post("/signup")
async def signup(user: UserCreate, background_tasks: BackgroundTasks):
new_user = await db.create(user)
background_tasks.add_task(send_welcome_email, new_user.email) # async
background_tasks.add_task(sync_audit_log, f"signup:{new_user.id}") # sync
return new_user
Async tasks run on the event loop; sync tasks run in the thread pool. Tasks execute sequentially in the order they were added.
Rule of thumb: prefer async background tasks in an async app — they don't block the event loop and don't require thread pool slots.
Sequentially, in the order they were added. If task A runs 2 seconds and task B runs 1 second, total background time is 3 seconds.
background_tasks.add_task(task_a) # runs first
background_tasks.add_task(task_b) # runs second, after task_a completes
For parallel background work, use asyncio.gather() inside a single async task:
async def parallel_tasks(email: str, item_id: int):
await asyncio.gather(
send_email(email),
update_analytics(item_id),
)
background_tasks.add_task(parallel_tasks, user.email, item.id)
Rule of thumb: if background tasks are independent and their combined duration
matters, wrap them in an asyncio.gather inside a single background task.
The exception is logged (Uvicorn logs it as an unhandled exception) but does not affect the response — it was already sent. The client never knows the task failed.
def risky_task():
raise ValueError("Something went wrong") # logged, response already sent
@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
background_tasks.add_task(risky_task)
return item # this 200 response is already sent before risky_task runs
For reliable task execution (retries, persistence), use a proper task queue (Celery, ARQ, RQ) where tasks can be retried on failure.
Rule of thumb: use BackgroundTasks only for best-effort work (email
notification, audit logging); never for work that must succeed (payment processing,
data integrity).
| Concern | BackgroundTasks | Task queue (Celery/ARQ) |
|---|---|---|
| Must succeed | ❌ no retries | ✅ configurable retries |
| Survives crash/restart | ❌ lost if process dies | ✅ persisted in broker |
| Long-running (minutes) | ❌ blocks worker | ✅ separate worker |
| Rate-limited external API | ❌ no throttling | ✅ rate limiting |
| Monitor/inspect tasks | ❌ | ✅ Flower, ARQ dashboard |
| Simple fire-and-forget | ✅ | overkill |
# BackgroundTasks — fine for quick, best-effort
background_tasks.add_task(send_analytics_event, event_data)
# Celery — required for reliability
process_payment.delay(order_id) # persisted in Redis/RabbitMQ
Rule of thumb: if the task must succeed, use a task queue; if losing it
occasionally is acceptable, BackgroundTasks is simpler.
# celery_app.py
from celery import Celery
celery = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
@celery.task
def send_email_task(to: str, subject: str, body: str):
smtp.send(to, subject, body)
# FastAPI route
from celery_app import send_email_task
@app.post("/signup")
async def signup(user: UserCreate):
new_user = await db.create(user)
send_email_task.delay(new_user.email, "Welcome!", "...")
return new_user
Start workers separately:
celery -A celery_app worker --loglevel=info
Rule of thumb: keep Celery tasks in a separate module from FastAPI routes — tasks should be importable by Celery workers that don't start the web server.
ARQ (Async Redis Queue) is a task queue built for asyncio — tasks are async def
functions, the worker is an event loop, and it uses Redis as the broker.
# tasks.py
from arq import cron
async def send_welcome_email(ctx, email: str):
await ctx["smtp"].send(email, "Welcome!")
class WorkerSettings:
functions = [send_welcome_email]
redis_settings = RedisSettings()
# FastAPI
from arq import create_pool
@asynccontextmanager
async def lifespan(app):
app.state.arq = await create_pool(RedisSettings())
yield
await app.state.arq.aclose()
@app.post("/signup")
async def signup(user: UserCreate, request: Request):
await request.app.state.arq.enqueue_job("send_welcome_email", user.email)
return {"status": "created"}
ARQ advantages over Celery for async apps: native async, simpler setup, no serialisation overhead for Python objects, type hints work naturally.
Rule of thumb: choose ARQ for async-first apps; choose Celery when you need mature monitoring (Flower), scheduled tasks (Celery Beat), or a non-Python worker ecosystem.
Background tasks run after the request session is closed — you cannot reuse the request's DB session. Create a new session inside the task:
from sqlalchemy.orm import Session
from app.db import SessionLocal
def send_order_confirmation(order_id: int):
db = SessionLocal()
try:
order = db.get(Order, order_id)
send_email(order.user.email, f"Order {order_id} confirmed")
db.commit()
finally:
db.close()
@app.post("/orders")
async def create_order(data: OrderCreate, background_tasks: BackgroundTasks):
order = await db.create(data)
background_tasks.add_task(send_order_confirmation, order.id)
return order
Pass the ID, not the ORM object — ORM objects are tied to a session; they can't be used in a different session.
Rule of thumb: always open a fresh DB session inside background tasks; never pass ORM objects from the request session into background tasks.
Option A — APScheduler (in-process):
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app):
scheduler = AsyncIOScheduler()
scheduler.add_job(cleanup_expired_tokens, "interval", minutes=30)
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
Option B — ARQ cron jobs:
class WorkerSettings:
cron_jobs = [
cron(cleanup_expired_tokens, minute={0, 30}), # every 30 min
]
Option C — External scheduler (cron, Kubernetes CronJob, AWS EventBridge):
Hits a /internal/cleanup endpoint at the scheduled time.
Rule of thumb: in-process schedulers (APScheduler) are convenient but tasks run on every pod in a scaled deployment; use a single external scheduler or a distributed lock to prevent duplicate execution.
The async job pattern: accept the request, enqueue the task, return a task ID; provide a polling endpoint.
import uuid
from arq import create_pool
@app.post("/reports", status_code=202)
async def generate_report(request: Request, params: ReportParams):
task_id = str(uuid.uuid4())
await request.app.state.arq.enqueue_job("build_report", task_id, params.dict())
return {"task_id": task_id, "status": "pending"}
@app.get("/reports/{task_id}")
async def get_report_status(task_id: str, request: Request):
job = await request.app.state.arq.job(task_id)
if job is None:
raise HTTPException(404)
status = await job.status()
if status == "complete":
result = await job.result()
return {"status": "complete", "result": result}
return {"status": status}
Rule of thumb: return 202 Accepted for async tasks — it signals that the request was accepted but processing isn't complete yet.
Capturing large objects in the background task's closure keeps them alive until the task completes — even after the request is done.
# BAD — the entire response object stays in memory during task
@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
all_items = await db.get_all_items() # large list
background_tasks.add_task(process_items, all_items) # captured in closure
return item
# GOOD — pass just the IDs; let the task load what it needs
@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
new_item = await db.save(item)
background_tasks.add_task(process_new_item, new_item.id) # just an int
return new_item
Rule of thumb: pass primitive values (IDs, strings) to background tasks — not large ORM results, response objects, or request streams.
More Deployment & Middleware interview questions
More ways to practice
The self-quiz is live. Get notified when mock interviews and new question packs drop.