OAuth2PasswordBearer is a callable security dependency that:
- Extracts the Bearer token from the
Authorization: Bearer <token>header. - Returns the token string to your handler.
- Registers the OAuth2 password flow in the OpenAPI schema so Swagger UI shows an "Authorize" dialog.
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
@app.get("/me")
async def current_user(token: str = Depends(oauth2_scheme)):
# token is the raw Bearer string — validate it yourself
payload = decode_jwt(token)
return payload
OAuth2PasswordBearer only extracts the token — it does NOT validate it.
Validation is your responsibility.
Rule of thumb: use OAuth2PasswordBearer so Swagger UI gets the Authorize button;
do JWT decoding/verification in a separate get_current_user dependency.
Accept OAuth2PasswordRequestForm (username + password as form data) and return
a token response:
from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
from jose import jwt
@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form.username, form.password)
if not user:
raise HTTPException(
status_code=400, detail="Incorrect username or password"
)
access_token = jwt.encode(
{"sub": str(user.id), "exp": datetime.utcnow() + timedelta(minutes=30)},
settings.secret_key, algorithm="HS256"
)
return {"access_token": access_token, "token_type": "bearer"}
The response must have access_token and token_type fields — Swagger UI
reads these to store the token.
Rule of thumb: never return the user's password (even hashed) from /token;
return only the token and its type.
Use passlib with bcrypt:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
async def authenticate_user(username: str, password: str):
user = await db.get_user_by_username(username)
if not user or not verify_password(password, user.password_hash):
return None
return user
Never store plain-text passwords. bcrypt automatically includes a salt and
is resistant to GPU-accelerated brute-force attacks.
Rule of thumb: use passlib.CryptContext with bcrypt — it handles salting,
algorithm upgrades (deprecated="auto") and constant-time comparison.
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
class TokenData(BaseModel):
user_id: int
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await db.get_user(user_id)
if not user:
raise credentials_exception
return user
@app.get("/me")
async def me(user: User = Depends(get_current_user)):
return user
Rule of thumb: always include headers={"WWW-Authenticate": "Bearer"} on 401
responses — RFC 7235 requires it and some clients depend on it.
Define scopes in OAuth2PasswordBearer, encode them in the token, and verify
them with SecurityScopes:
from fastapi import Security, SecurityScopes
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
scopes={"read": "Read items", "write": "Create and update items"},
)
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
):
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
token_scopes = payload.get("scopes", [])
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
403,
detail=f"Scope '{scope}' required",
headers={"WWW-Authenticate": f'Bearer scope="{security_scopes.scope_str}"'},
)
return payload
@app.get("/items")
async def list_items(user = Security(get_current_user, scopes=["read"])):
...
Use Security() (not Depends()) to pass scope requirements.
Rule of thumb: encode granted scopes in the JWT at login time; verify required
scopes in get_current_user — keep scope logic in the auth dependency, not handlers.
Add an is_active check in get_current_user or a separate require_active_user
dependency:
async def get_current_active_user(
user: User = Depends(get_current_user),
) -> User:
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
@app.get("/orders")
async def orders(user: User = Depends(get_current_active_user)):
...
Separating "can we trust the token?" (get_current_user) from "is the user
allowed?" (get_current_active_user) makes the dependency graph cleaner and
each function independently testable.
Rule of thumb: chain fine-grained permission checks as separate dependencies —
don't put all guard logic in one giant get_current_user function.
Issue two tokens at login: a short-lived access token and a long-lived refresh token.
Store the refresh token hash in the DB. Provide a /refresh endpoint:
@app.post("/refresh")
async def refresh_token(refresh_token: str = Body(...)):
# 1. Verify it's a valid JWT
try:
payload = jwt.decode(refresh_token, REFRESH_SECRET, algorithms=["HS256"])
except JWTError:
raise HTTPException(401, "Invalid refresh token")
# 2. Check it exists and isn't revoked in the DB
stored = await db.get_refresh_token(payload["jti"])
if not stored or stored.revoked:
raise HTTPException(401, "Refresh token revoked")
# 3. Issue new access token (optionally rotate refresh token)
new_access = create_access_token(payload["sub"])
return {"access_token": new_access, "token_type": "bearer"}
Key design decisions: rotate refresh tokens on each use (reduces replay risk),
store a jti (JWT ID) for revocation, use a different signing secret for refresh tokens.
Rule of thumb: keep access tokens short (15-30 min); keep refresh tokens long (7-30 days) but revocable via a DB lookup.
JWTs are stateless — you can't "un-sign" one. Revocation requires a server-side store:
Option A — Revocation list (deny list):
@app.post("/logout")
async def logout(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
jti = payload["jti"]
exp = payload["exp"]
await redis.setex(f"revoked:{jti}", exp - int(time.time()), "1")
return {"detail": "Logged out"}
# In get_current_user, after decoding:
if await redis.exists(f"revoked:{payload['jti']}"):
raise HTTPException(401, "Token revoked")
Option B — Short expiry + refresh rotation (no revocation list needed): Access token expires in 5 minutes; compromised tokens are useless quickly.
Rule of thumb: for low-security APIs, use short-lived tokens; for high-security apps (banking, health), maintain a revocation list in Redis keyed by JWT ID.
The OAuth2 specification (RFC 6749) requires the token endpoint to accept
application/x-www-form-urlencoded data — not JSON. This is for historical
compatibility with web browsers and existing OAuth2 clients.
FastAPI's OAuth2PasswordRequestForm reads:
username(form field)password(form field)scope(optional, space-separated)grant_type(must be"password")
@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
# form.username, form.password, form.scopes
...
Swagger UI's "Authorize" dialog sends this form automatically.
Rule of thumb: never change the /token endpoint to JSON — it breaks OAuth2
library compatibility; keep all other API endpoints as JSON.
Use the pyotp library to generate and verify TOTP codes:
import pyotp
@app.post("/2fa/setup")
async def setup_2fa(user: User = Depends(get_current_user)):
secret = pyotp.random_base32()
await db.save_totp_secret(user.id, secret)
totp = pyotp.TOTP(secret)
return {"provisioning_uri": totp.provisioning_uri(user.email, issuer_name="MyApp")}
@app.post("/2fa/verify")
async def verify_2fa(code: str = Body(...), user: User = Depends(get_current_user)):
secret = await db.get_totp_secret(user.id)
if not pyotp.TOTP(secret).verify(code):
raise HTTPException(401, "Invalid 2FA code")
# issue the final access token
return {"access_token": create_access_token(user.id), "token_type": "bearer"}
The flow: login → partial token → /2fa/verify with code → full access token.
Rule of thumb: issue a scoped "pending 2FA" token after password verification; only issue a full-access token after TOTP is confirmed.
More Security & Auth interview questions
More ways to practice
The self-quiz is live. Get notified when mock interviews and new question packs drop.