Skip to content

Background Tasks Interview Questions & Answers

11 questions Updated 2026-06-20 Share:

FastAPI background task interview questions — BackgroundTasks, task queues, Celery, ARQ, when to use each and common pitfalls.

11 of 11

BackgroundTasks lets you schedule functions to run after the HTTP response is sent to the client. The handler returns immediately; the task runs in the same process after the response is fully written.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(message + "\n")

@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
    result = await db.save(item)
    background_tasks.add_task(write_log, f"Created item {result.id}")
    return result   # response sent; write_log runs after

Rule of thumb: inject BackgroundTasks as a parameter — FastAPI provides it automatically; never instantiate it yourself.

Yes — add_task accepts both async def and regular def functions.

async def send_welcome_email(email: str):
    await smtp_client.send(email, "Welcome!")

def sync_audit_log(event: str):
    logger.info(event)    # sync — runs in thread pool

@app.post("/signup")
async def signup(user: UserCreate, background_tasks: BackgroundTasks):
    new_user = await db.create(user)
    background_tasks.add_task(send_welcome_email, new_user.email)   # async
    background_tasks.add_task(sync_audit_log, f"signup:{new_user.id}")  # sync
    return new_user

Async tasks run on the event loop; sync tasks run in the thread pool. Tasks execute sequentially in the order they were added.

Rule of thumb: prefer async background tasks in an async app — they don't block the event loop and don't require thread pool slots.

Sequentially, in the order they were added. If task A runs 2 seconds and task B runs 1 second, total background time is 3 seconds.

background_tasks.add_task(task_a)   # runs first
background_tasks.add_task(task_b)   # runs second, after task_a completes

For parallel background work, use asyncio.gather() inside a single async task:

async def parallel_tasks(email: str, item_id: int):
    await asyncio.gather(
        send_email(email),
        update_analytics(item_id),
    )

background_tasks.add_task(parallel_tasks, user.email, item.id)

Rule of thumb: if background tasks are independent and their combined duration matters, wrap them in an asyncio.gather inside a single background task.

The exception is logged (Uvicorn logs it as an unhandled exception) but does not affect the response — it was already sent. The client never knows the task failed.

def risky_task():
    raise ValueError("Something went wrong")   # logged, response already sent

@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
    background_tasks.add_task(risky_task)
    return item   # this 200 response is already sent before risky_task runs

For reliable task execution (retries, persistence), use a proper task queue (Celery, ARQ, RQ) where tasks can be retried on failure.

Rule of thumb: use BackgroundTasks only for best-effort work (email notification, audit logging); never for work that must succeed (payment processing, data integrity).

Concern BackgroundTasks Task queue (Celery/ARQ)
Must succeed ❌ no retries ✅ configurable retries
Survives crash/restart ❌ lost if process dies ✅ persisted in broker
Long-running (minutes) ❌ blocks worker ✅ separate worker
Rate-limited external API ❌ no throttling ✅ rate limiting
Monitor/inspect tasks ✅ Flower, ARQ dashboard
Simple fire-and-forget overkill
# BackgroundTasks — fine for quick, best-effort
background_tasks.add_task(send_analytics_event, event_data)

# Celery — required for reliability
process_payment.delay(order_id)   # persisted in Redis/RabbitMQ

Rule of thumb: if the task must succeed, use a task queue; if losing it occasionally is acceptable, BackgroundTasks is simpler.

# celery_app.py
from celery import Celery

celery = Celery(
    "myapp",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

@celery.task
def send_email_task(to: str, subject: str, body: str):
    smtp.send(to, subject, body)
# FastAPI route
from celery_app import send_email_task

@app.post("/signup")
async def signup(user: UserCreate):
    new_user = await db.create(user)
    send_email_task.delay(new_user.email, "Welcome!", "...")
    return new_user

Start workers separately:

celery -A celery_app worker --loglevel=info

Rule of thumb: keep Celery tasks in a separate module from FastAPI routes — tasks should be importable by Celery workers that don't start the web server.

ARQ (Async Redis Queue) is a task queue built for asyncio — tasks are async def functions, the worker is an event loop, and it uses Redis as the broker.

# tasks.py
from arq import cron

async def send_welcome_email(ctx, email: str):
    await ctx["smtp"].send(email, "Welcome!")

class WorkerSettings:
    functions = [send_welcome_email]
    redis_settings = RedisSettings()
# FastAPI
from arq import create_pool

@asynccontextmanager
async def lifespan(app):
    app.state.arq = await create_pool(RedisSettings())
    yield
    await app.state.arq.aclose()

@app.post("/signup")
async def signup(user: UserCreate, request: Request):
    await request.app.state.arq.enqueue_job("send_welcome_email", user.email)
    return {"status": "created"}

ARQ advantages over Celery for async apps: native async, simpler setup, no serialisation overhead for Python objects, type hints work naturally.

Rule of thumb: choose ARQ for async-first apps; choose Celery when you need mature monitoring (Flower), scheduled tasks (Celery Beat), or a non-Python worker ecosystem.

Background tasks run after the request session is closed — you cannot reuse the request's DB session. Create a new session inside the task:

from sqlalchemy.orm import Session
from app.db import SessionLocal

def send_order_confirmation(order_id: int):
    db = SessionLocal()
    try:
        order = db.get(Order, order_id)
        send_email(order.user.email, f"Order {order_id} confirmed")
        db.commit()
    finally:
        db.close()

@app.post("/orders")
async def create_order(data: OrderCreate, background_tasks: BackgroundTasks):
    order = await db.create(data)
    background_tasks.add_task(send_order_confirmation, order.id)
    return order

Pass the ID, not the ORM object — ORM objects are tied to a session; they can't be used in a different session.

Rule of thumb: always open a fresh DB session inside background tasks; never pass ORM objects from the request session into background tasks.

Option A — APScheduler (in-process):

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app):
    scheduler = AsyncIOScheduler()
    scheduler.add_job(cleanup_expired_tokens, "interval", minutes=30)
    scheduler.start()
    yield
    scheduler.shutdown()

app = FastAPI(lifespan=lifespan)

Option B — ARQ cron jobs:

class WorkerSettings:
    cron_jobs = [
        cron(cleanup_expired_tokens, minute={0, 30}),  # every 30 min
    ]

Option C — External scheduler (cron, Kubernetes CronJob, AWS EventBridge): Hits a /internal/cleanup endpoint at the scheduled time.

Rule of thumb: in-process schedulers (APScheduler) are convenient but tasks run on every pod in a scaled deployment; use a single external scheduler or a distributed lock to prevent duplicate execution.

The async job pattern: accept the request, enqueue the task, return a task ID; provide a polling endpoint.

import uuid
from arq import create_pool

@app.post("/reports", status_code=202)
async def generate_report(request: Request, params: ReportParams):
    task_id = str(uuid.uuid4())
    await request.app.state.arq.enqueue_job("build_report", task_id, params.dict())
    return {"task_id": task_id, "status": "pending"}

@app.get("/reports/{task_id}")
async def get_report_status(task_id: str, request: Request):
    job = await request.app.state.arq.job(task_id)
    if job is None:
        raise HTTPException(404)
    status = await job.status()
    if status == "complete":
        result = await job.result()
        return {"status": "complete", "result": result}
    return {"status": status}

Rule of thumb: return 202 Accepted for async tasks — it signals that the request was accepted but processing isn't complete yet.

Capturing large objects in the background task's closure keeps them alive until the task completes — even after the request is done.

# BAD — the entire response object stays in memory during task
@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
    all_items = await db.get_all_items()   # large list
    background_tasks.add_task(process_items, all_items)  # captured in closure
    return item

# GOOD — pass just the IDs; let the task load what it needs
@app.post("/items")
async def create_item(item: Item, background_tasks: BackgroundTasks):
    new_item = await db.save(item)
    background_tasks.add_task(process_new_item, new_item.id)  # just an int
    return new_item

Rule of thumb: pass primitive values (IDs, strings) to background tasks — not large ORM results, response objects, or request streams.

More ways to practice

The self-quiz is live. Get notified when mock interviews and new question packs drop.

or
Join our WhatsApp Channel