Building a Production-Ready Agent Stack: Part 3 - Authentication & Security
Implementing Auth0 authentication with JWTs for API requests and signed cookies for SSE streaming. Learn the hybrid approach that combines the best of both worlds.
Building a Production-Ready Agent Stack: Part 3 - Authentication & Security#
In Parts 1 and 2, we built a solid foundation: Docker containers, FastAPI backend, database with migrations, and clean architecture with repositories and services. But right now, anyone can access anyone’s sessions and messages. Time to fix that.
We have a problem. Actually, we have two different authentication needs:
Need 1: API authentication. We need to protect our REST endpoints (/api/sessions, /api/messages, etc.). Standard bearer token authentication works perfectly here.
Need 2: SSE authentication. We need to authenticate Server-Sent Events for streaming agent responses. But here’s the problem: EventSource (the browser API for SSE) can’t send custom headers. No Authorization: Bearer <token> header allowed.
Most tutorials either punt on the SSE problem (put tokens in query strings—terrible idea) or overcomplicate things with dual auth systems. We’re going to do it right with a hybrid approach:
The Solution:
JWTs for API requests - Standard Authorization: Bearer header
Signed cookies for SSE - Short-lived (5-10 min), stateless, secure
// Frontend makes API request with JWT
consttoken=awaitgetAccessToken();fetch('https://api.example.com/sessions',{headers:{'Authorization':`Bearer ${token}`}})
Backend extracts token from header, verifies signature and claims, accepts or rejects. Simple, standard, secure.
When SSE was standardized in 2009, it was designed for public event streams (news feeds, stock tickers, sports scores). The assumption was: if you need authentication, handle it at the application level.
That made sense in 2009. In 2025, streaming personalized agent responses? We need auth.
This secret signs cookies. If leaked, attackers can forge cookies. Keep it secret, rotate it periodically.
SESSION_COOKIE_NAME and SESSION_COOKIE_MAX_AGE control the cookie name and expiration time. There is no default; set as needed. Like 10 minutes (600 seconds) is a good balance for time and the name can be anything like stream_session_cookie.
PKCE (Proof Key for Code Exchange, pronounced “pixie”) prevents authorization code interception in public clients.
Why we need it:
Traditional OAuth2 used client secrets to prevent code interception. But SPAs can’t keep secrets — JavaScript runs in the browser, visible to users. An attacker could:
Intercept the authorization code during the redirect (network sniffer, compromised proxy)
Exchange it for tokens at the token endpoint
Impersonate the user
PKCE solves this without secrets using one-way cryptographic hashing.
1. Frontend generates random code_verifier (128 chars of entropy)
Example: "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"
2. Frontend hashes it: code_challenge = BASE64URL(SHA256(code_verifier))
SHA256 is one-way: you can't reverse the hash to get the original
Example challenge: "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM"
3. Frontend -> Auth0: authorize request with code_challenge
URL: /authorize?code_challenge=E9Melhoa...&code_challenge_method=S256
4. User logs in on Auth0's page
Auth0 stores the code_challenge tied to the authorization code
5. Auth0 -> Frontend: redirect with authorization code
URL: yourapp.com/callback?code=abc123
Attacker could intercept this code!
6. Frontend -> Auth0: exchange code + original code_verifier for tokens
POST /oauth/token
{
"code": "abc123",
"code_verifier": "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"
}
7. Auth0 verifies: SHA256(code_verifier) == stored code_challenge
Hashes the provided verifier and compares to stored challenge
If match: the request came from the same client that started auth
8. Auth0 -> Frontend: issues access_token (JWT)
Why this is secure:
Cryptographic one-way property:
SHA256 is a one-way hash function (also called a “cryptographic hash”)
Easy to compute: hash = SHA256(input) takes microseconds
Impossible to reverse: Given the hash, you can’t compute the original input
Even knowing a billion hashes won’t help you reverse a single one
Attack scenario:
Let’s say an attacker intercepts the authorization code (step 5):
Could they reverse the hash to get the original code_verifier?
No. This is the core security property of SHA256. Even with:
The hash value
Unlimited computing power for years
Knowledge of the algorithm
You cannot reverse a SHA256 hash. The mathematical function discards information (many inputs map to each output — called “collisions” in theory, but finding one is computationally infeasible).
Why 128 characters of randomness?
The code_verifier must be high-entropy (unpredictable). With 128 random characters from the set [A-Za-z0-9-._~]:
66^128 possible values ≈ 2^760
Even if you tried 1 trillion guesses per second for the age of the universe, you’d have 0% chance of guessing it
This is why SPAs don’t need client secrets — PKCE provides equivalent security through cryptographic one-way functions.
Note
Auth0 React SDK handles PKCE automatically. You’ll never see code_verifier or code_challenge in your code — the SDK manages it. But understanding the cryptographic primitives (one-way hashing, entropy) helps you reason about the security of your auth flow.
{"alg":"RS256",// RSA signature with SHA-256 hashing
"typ":"JWT",// This is a JSON Web Token
"kid":"abc123"// Key ID: which public key to use for verification
}
{"iss":"https://your-tenant.us.auth0.com/",// Issuer: who created this token
"sub":"auth0|507f1f77bcf86cd799439011",// Subject: the user's ID
"aud":"https://api.agent-stack.com",// Audience: who should accept this token
"exp":1735574400,// Expiration: Unix timestamp
"iat":1735570800,// Issued at: Unix timestamp
"email":"user@example.com"// Custom claims (any data)
}
# Step 1: Create the message to signmessage=base64url(header)+"."+base64url(payload)# "eyJhbGci...J9.eyJpc3Mi...fQ"# Step 2: Hash the message with SHA-256hash=SHA256(message)# SHA-256 reduces any input to a fixed 256-bit (32-byte) digest# Example: "3a7b8c9d..." (64 hex characters)# Step 3: Sign the hash with RSA private key (only Auth0 has this)signature=RSA_SIGN(hash,auth0_private_key)# RSA encryption of the hash using Auth0's secret private key# Creates ~256 bytes of signature data# Step 4: Encode signature as base64urltoken=message+"."+base64url(signature)
# Step 1: Split token into partsheader,payload,signature=token.split(".")# Step 2: Fetch Auth0's public key (from JWKS)public_key=get_auth0_public_key(kid="abc123")# Public key is mathematically related to private key# Can verify signatures but CANNOT create them# Step 3: Recreate the messagemessage=header+"."+payload# Step 4: Hash the message (same as Auth0 did)hash=SHA256(message)# Step 5: Verify signature with RSA public keyis_valid=RSA_VERIFY(hash,signature,public_key)# RSA decrypts the signature using the public key# Compares decrypted value to our computed hash# If they match: signature is valid, token is authentic
Why RSA (asymmetric cryptography)?
RSA uses a key pair:
Private key: Can create signatures (only Auth0 has this)
Public key: Can verify signatures (anyone can have this)
This is different from symmetric algorithms like HS256 (HMAC-SHA256):
HS256: same secret for signing AND verifying
signature = HMAC-SHA256(message, shared_secret)
Problem: if we verify tokens, we can also create fake ones
RS256: different keys for signing vs verifying
signature = RSA_SIGN(hash, private_key) // Auth0 only
verify = RSA_VERIFY(hash, signature, public_key) // Anyone
Benefit: we can verify but can't forge
The mathematics: RSA is based on the difficulty of factoring large prime numbers:
Private key: two large primes (p, q) and derived values
Public key: the product n = p × q and an exponent e
Signing: compute signature^d ≡ hash (mod n), where d is from private key
Verifying: compute signature^e ≡ hash (mod n), where e is from public key
Security: knowing (n, e) doesn’t let you compute d without factoring n into p × q (computationally infeasible for 2048-bit keys)
Why base64url encoding?
JWTs must be URL-safe (passed in headers, query strings). Base64url uses:
A-Z, a-z, 0-9, -, _ (URL-safe characters)
No padding = characters
Standard base64 uses +, /, = which break URLs
Why hash before signing?
RSA can only encrypt data smaller than the key size (typically 2048 bits = 256 bytes). But tokens can be larger. SHA-256 always produces a fixed 32-byte hash, regardless of input size. We sign the hash, not the full message.
Can’t someone modify the payload and re-sign it?
No. They’d need Auth0’s private key to create a valid signature. Without it:
Attackermodifiespayload:{"sub":"auth0|attacker",//ChangeduserID"email":"attacker@example.com"}Attackertriestosignwiththeirownkey:signature=RSA_SIGN(hash,attacker_private_key)Ourverificationfails:RSA_VERIFY(hash,signature,auth0_public_key)//FALSE//Thepublickeydoesn't match the private key used for signing
The mathematical relationship between Auth0’s public and private keys ensures this.
What we verify:
When jwt.decode() succeeds, we know:
Authenticity: Token was created by Auth0 (signature valid with Auth0’s public key)
Integrity: Payload hasn’t been modified (signature covers entire message)
Authorization: Token is for our API (audience claim matches)
Not expired: Current time < expiration timestamp
From correct issuer: Issuer claim matches Auth0 tenant
We trust the payload claims because the signature proves no one tampered with them.
Why RS256 instead of HS256?
Feature
RS256 (RSA)
HS256 (HMAC)
Keys
Public/private pair
Single shared secret
Who can verify?
Anyone with public key
Only those with secret
Who can sign?
Only holder of private key
Anyone with secret
Use case
Auth0 signs, multiple services verify
Single service signs and verifies
Security
Can’t forge signatures without private key
If secret leaks, attacker can forge
Key rotation
Publish new public key, no secret sharing
Must securely share new secret
For our architecture (Auth0 signs, we verify), RS256 is the right choice.
# backend/app/core/auth.pyfromdatetimeimportdatetime,timedeltafromtypingimportAny,OptionalfromfastapiimportHTTPException,Security,Request,statusfromfastapi.securityimportHTTPBearer,HTTPAuthorizationCredentialsfromjoseimportJWTError,jwtfromjose.exceptionsimportExpiredSignatureError,JWTClaimsErrorfromitsdangerousimportURLSafeTimedSerializer,BadSignature,SignatureExpiredimporthttpxfromapp.core.settingsimportsettings# Security scheme for Swagger UIsecurity=HTTPBearer()# JWKS cache (JSON Web Key Set from Auth0)_jwks_cache:Optional[dict[str,Any]]=None_jwks_cache_time:Optional[datetime]=NoneJWKS_CACHE_TTL=timedelta(hours=1)asyncdefget_jwks()->dict[str,Any]:"""
Fetch Auth0's public keys for JWT signature verification.
Auth0 rotates keys periodically, so we cache for 1 hour.
"""global_jwks_cache,_jwks_cache_time# Return cached JWKS if still validif_jwks_cacheand_jwks_cache_time:ifdatetime.utcnow()-_jwks_cache_time<JWKS_CACHE_TTL:return_jwks_cache# Fetch fresh JWKSjwks_url=f"https://{settings.auth0_domain}/.well-known/jwks.json"asyncwithhttpx.AsyncClient()asclient:response=awaitclient.get(jwks_url)response.raise_for_status()_jwks_cache=response.json()_jwks_cache_time=datetime.utcnow()return_jwks_cacheasyncdefverify_jwt(token:str)->dict[str,Any]:"""
Verify JWT signature and claims.
Returns payload if valid, raises HTTPException if invalid.
"""try:# Fetch JWKS (cached)jwks=awaitget_jwks()# Decode token header to get key IDunverified_header=jwt.get_unverified_header(token)key_id=unverified_header.get("kid")ifnotkey_id:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token missing 'kid' (key ID)",)# Find matching public keyrsa_key=Noneforkeyinjwks.get("keys",[]):ifkey["kid"]==key_id:rsa_key={"kty":key["kty"],"kid":key["kid"],"use":key["use"],"n":key["n"],"e":key["e"],}breakifnotrsa_key:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Unable to find matching public key",)# Verify signature and claimspayload=jwt.decode(token,rsa_key,algorithms=["RS256"],audience=settings.auth0_audience,issuer=settings.auth0_issuer,)returnpayloadexceptExpiredSignatureError:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Token has expired",)exceptJWTClaimsErrorase:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=f"Invalid token claims: {str(e)}",)exceptJWTErrorase:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=f"Invalid token: {str(e)}",)asyncdefget_current_user(credentials:HTTPAuthorizationCredentials=Security(security),)->dict[str,Any]:"""
FastAPI dependency for extracting user from JWT.
Usage:
@app.get("/protected")
async def protected(user: dict = Depends(get_current_user)):
user_id = user["sub"]
"""token=credentials.credentialsreturnawaitverify_jwt(token)asyncdefget_current_user_id(user:dict[str,Any]=Security(get_current_user),)->str:"""
Extract user ID from verified JWT.
Auth0 user IDs look like: "auth0|507f1f77bcf86cd799439011"
"""returnuser["sub"]# Cookie signer for SSE authenticationcookie_signer=URLSafeTimedSerializer(settings.cookie_secret,salt="stream-session")defcreate_stream_cookie(user_id:str)->str:"""
Create signed cookie for SSE authentication.
Cookie contains user_id and expiration. Stateless—no database lookup.
"""returncookie_signer.dumps(user_id)defverify_stream_cookie(cookie_value:str)->str:"""
Verify signed cookie and extract user_id.
Raises BadSignature if cookie tampered with.
Raises SignatureExpired if cookie too old.
"""try:# max_age in secondsuser_id=cookie_signer.loads(cookie_value,max_age=settings.session_cookie_max_age)returnuser_idexceptSignatureExpired:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Stream session expired. Refresh and reconnect.",)exceptBadSignature:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Invalid stream session cookie",)asyncdefget_user_from_stream_cookie(request:Request)->str:"""
FastAPI dependency for SSE endpoints.
Extracts and verifies the stream session cookie.
Returns user_id if valid.
"""cookie_value=request.cookies.get(settings.session_cookie_name)ifnotcookie_value:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="No stream session. Call POST /auth/session first.",)returnverify_stream_cookie(cookie_value)
We cache this for 1 hour. Auth0 rotates keys occasionally (months apart), and publishes new keys before removing old ones (overlap period for graceful transition).
defHMAC_SHA256(message,key,salt=""):# Step 1: Derive keys from secret# XOR key with ipad (0x36 repeated) and opad (0x5c repeated)key_inner=key^0x3636363636...# Inner paddingkey_outer=key^0x5c5c5c5c5c...# Outer padding# Step 2: Inner hashinner=SHA256(key_inner+salt+message)# Combines key with message, hashes it# Step 3: Outer hashouter=SHA256(key_outer+inner)# Hashes the result again with a different key variantreturnouter
Why two hashes?
This construction (nested hashing) prevents length extension attacks:
Plain SHA256 has a weakness: if you know hash(A), you can compute hash(A + B) without knowing A
HMAC’s double-hash construction makes this impossible
Even if you know HMAC(message), you can’t compute HMAC(message + extra) without the key
# Timing attack vulnerabilitydefinsecure_compare(a,b):foriinrange(len(a)):ifa[i]!=b[i]:returnFalse# Returns faster if first char differsreturnTrue# Attacker measures response time to guess signature byte-by-byte# Correct first byte: slower (continues to check second byte)# Wrong first byte: faster (returns immediately)
# Without salt, an attacker could reuse signaturescookie_signature=HMAC-SHA256(user_id,COOKIE_SECRET)# This signature is valid for ANY context# With salt, signatures are context-specificstream_cookie_sig=HMAC-SHA256(user_id,COOKIE_SECRET,salt="stream-session")admin_cookie_sig=HMAC-SHA256(user_id,COOKIE_SECRET,salt="admin-session")# stream_cookie_sig ≠ admin_cookie_sig
Even with the same user_id and secret, different contexts produce different signatures. An attacker can’t steal a stream cookie and use it for admin access.
Signing vs Encryption:
Property
HMAC Signing
Encryption (AES)
Purpose
Prove authenticity & integrity
Hide data
Readable?
Yes (payload in plaintext)
No (ciphertext unreadable)
Can modify?
No (breaks signature)
No (breaks decryption)
Performance
Fast (one-way hash)
Slower (symmetric cipher)
Use case
Public data that must not be tampered
Secret data that must not be read
For cookies containing user IDs (not secret), signing is perfect. We don’t care if someone reads the user ID—we care that they can’t forge a different one.
Security properties:
Authenticity: Only someone with COOKIE_SECRET can create valid signatures
Integrity: Modifying any part of the payload changes the signature
Freshness: Timestamp ensures cookies expire (can’t reuse old ones)
Non-repudiation: Signature proves we issued this cookie
Domain separation: Salt prevents signature reuse across contexts
No database lookup needed—the cookie itself is cryptographic proof of authentication.
We need to add Auth0 and cookie configuration to our existing Settings class. Keep all your existing database, CORS, and environment settings — we’re just adding new fields.
# backend/app/core/settings.pyfrompydanticimportfield_validatorfrompydantic_settingsimportBaseSettingsclassSettings(BaseSettings):# Existing fields from Parts 1 & 2:# - database_url: str# - env: str# - cors_origins: list[str]# - etc.# (keep all of these unchanged)# NEW: Auth0 configurationauth0_domain:strauth0_audience:strauth0_issuer:str|None=None# NEW: Cookie signing for SSEcookie_secret:strsession_cookie_name:str="stream_session"session_cookie_max_age:int=600# 10 minutes# NEW: Validators for auth fields@field_validator("auth0_domain")@classmethoddefvalidate_auth0_domain(cls,v:str)->str:"""Remove https:// if present."""returnv.replace("https://","").replace("http://","")@field_validator("auth0_issuer",mode="before")@classmethoddefset_auth0_issuer(cls,v:str|None,info)->str:"""Default issuer from domain."""ifv:returnvdomain=info.data.get("auth0_domain")ifdomain:returnf"https://{domain}/"raiseValueError("Cannot determine AUTH0_ISSUER")@field_validator("cookie_secret")@classmethoddefvalidate_cookie_secret(cls,v:str,info)->str:"""Ensure secret is strong in production."""ifnotv:raiseValueError("COOKIE_SECRET is required")ifinfo.data.get("env")=="prod"andlen(v)<32:raiseValueError("COOKIE_SECRET must be at least 32 characters in production")returnv# NEW: Property for cookie security flag@propertydefsession_cookie_secure(self)->bool:"""Cookies only over HTTPS in production."""returnself.is_productionsettings=Settings()
# backend/app/api/auth.pyfromfastapiimportAPIRouter,Depends,Response,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.core.authimport(get_current_user,get_current_user_id,create_stream_cookie,)fromapp.core.settingsimportsettingsfromapp.api.dependenciesimportget_sessionfromapp.domain.services.user_serviceimportuser_servicerouter=APIRouter(prefix="/auth",tags=["authentication"])@router.post("/session",status_code=status.HTTP_204_NO_CONTENT)asyncdefcreate_session_cookie(response:Response,db:AsyncSession=Depends(get_session),user_id:str=Depends(get_current_user_id),):"""
Exchange JWT for stream session cookie.
This is the magic endpoint that bridges JWT authentication (works for API)
to cookie authentication (works for SSE).
Frontend flow:
1. Get JWT from Auth0: const token = await getAccessTokenSilently()
2. Call this endpoint with Authorization: Bearer <token>
3. Backend verifies JWT and sets cookie
4. Open EventSource - browser sends cookie automatically
Returns: 204 No Content (cookie is in Set-Cookie header)
"""# Ensure user exists in database (auto-register if first login)user=awaituser_service.get_or_create_from_auth0_id(db,user_id)# Create signed cookie containing user_id# Uses HMAC to prevent tampering, includes timestamp for expirationcookie_value=create_stream_cookie(user_id)# Set cookie with security flagsresponse.set_cookie(key=settings.session_cookie_name,# Name: "stream_session"value=cookie_value,# Signed: "user_id.timestamp.signature"max_age=settings.session_cookie_max_age,# Lifetime: 600 seconds (10 min)httponly=True,# JS can't read it (XSS protection)secure=settings.session_cookie_secure,# HTTPS only in productionsamesite="lax",# CSRF protection)# 204 response = success, no body, cookie in header@router.get("/me")asyncdefget_current_user_info(user_data:dict=Depends(get_current_user),db:AsyncSession=Depends(get_session),):"""
Get authenticated user's profile.
Useful for frontend to display user info in the UI (avatar, name, email).
Also auto-registers user on first request.
Returns: User profile with database ID and Auth0 data
"""# get_current_user verifies JWT and returns full payload# We extract claims and sync with databaseuser=awaituser_service.get_or_create_from_auth0_id(db,auth0_id=user_data["sub"],# "auth0|123" or "google-oauth2|456"email=user_data.get("email",""),# Optional (privacy settings)name=user_data.get("name",""),# Optional)return{"id":user.id,# Database ID (internal)"auth0_id":user.auth0_id,# Auth0 ID (external)"email":user.email,"name":user.name,"created_at":user.created_at.isoformat(),}
Problem: EventSource can't send Authorization header
Solution: EventSource automatically sends cookies
Challenge: How do we get a cookie?
Answer: Exchange JWT for cookie via API request (which CAN send headers)
# backend/app/main.pyapp.add_middleware(CORSMiddleware,allow_origins=["http://localhost:5173"],allow_credentials=True,# Allow cookies in CORS requestsallow_methods=["*"],allow_headers=["*"],)
Without allow_credentials=True, browsers reject the Set-Cookie header for security.
// Without HttpOnly:
document.cookie// "stream_session=abc123..."
// XSS attack can steal cookie and send to attacker's server
// With HttpOnly:
document.cookie// "" (cookie hidden from JavaScript)
// XSS attack can't see or steal it
The browser still sends it with requests, but JavaScript can’t read it. This blocks XSS attacks.
secure=settings.session_cookie_secure - HTTPS only in production
This is our PostgreSQL primary key. It’s what foreign keys reference (sessions.user_id, messages.user_id). It’s auto-incrementing, unique per user in our database.
Why do we need both?
We can’t use Auth0 IDs as primary keys because:
They’re strings - slower to index and join than integers
They’re external - we don’t control the format (Auth0 might change it)
They’re verbose - 30+ characters vs 4-8 bytes for an integer
We can’t use database IDs in JWTs because:
Sequential - exposes user count (user 42 means ~42 users)
Predictable - easy to enumerate all users
Not portable - if we migrate databases, IDs change
# backend/app/domain/services/user_service.pyfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.persistence.repositories.user_repoimportuser_repofromapp.persistence.modelsimportUserclassUserService:"""Business logic for user operations."""asyncdefget_or_create_from_auth0_id(self,db:AsyncSession,auth0_id:str,email:str="",name:str="",)->User:"""
Get user by Auth0 ID, or create if first login.
This is the bridge between Auth0 (external identity) and our database
(internal identity). Called on every authenticated request.
Args:
auth0_id: From JWT 'sub' claim (e.g., "auth0|507f1f77bcf...")
email: From JWT 'email' claim (optional)
name: From JWT 'name' claim (optional)
Returns:
User object with database ID populated
"""# Try to find existing useruser=awaituser_repo.get_by_auth0_id(db,auth0_id)ifuser:# User exists: check if profile changed in Auth0# (user updated email/name in Auth0 dashboard or social provider)updated=Falseifemailanduser.email!=email:user.email=emailupdated=Trueifnameanduser.name!=name:user.name=nameupdated=Trueifupdated:# Persist changes to databaseawaitdb.commit()awaitdb.refresh(user)returnuser# User doesn't exist: first login, auto-registeruser=awaituser_repo.create(db,auth0_id=auth0_id,email=email,name=name,)returnuseruser_service=UserService()
1. User logs in → Auth0 issues JWT with sub="auth0|123"
2. User calls API → Backend verifies JWT → extracts auth0_id
3. get_or_create_from_auth0_id("auth0|123") → finds existing user
4. Check if email/name changed in Auth0 → update if needed
5. Return user with id=42
6. Use user.id for all database queries
1. User logs in with Google → Auth0 creates account
2. Auth0 issues JWT with sub="google-oauth2|110223..."
3. User calls API → Backend verifies JWT
4. get_or_create_from_auth0_id("google-oauth2|110223...") → not found
5. Create new user record with auth0_id="google-oauth2|110223..."
6. Return user with id=43 (auto-incremented)
7. User is now registered without filling out a form
Why update email/name on every request?
Users can change their profile in Auth0 (or their social provider). If someone updates their name in Google, we want it reflected in our app. We sync on every authenticated request.
This is cheap:
Only updates if changed (database write only when necessary)
Already fetching user for authorization check (no extra query)
Auth0 already validated the email/name (we trust the JWT)
Why optional email/name?
Some OAuth providers don’t share email (privacy settings). Some don’t provide names. We handle missing data gracefully with empty strings.
# backend/app/persistence/repositories/user_repo.pyfromtypingimportOptionalfromsqlalchemyimportselectfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.persistence.modelsimportUserclassUserRepository:"""Database operations for users."""asyncdefget_by_id(self,db:AsyncSession,user_id:int)->Optional[User]:"""
Get user by database primary key.
Used when we already have the internal ID (e.g., from a foreign key).
Returns None if not found.
"""result=awaitdb.execute(select(User).where(User.id==user_id))returnresult.scalar_one_or_none()asyncdefget_by_auth0_id(self,db:AsyncSession,auth0_id:str)->Optional[User]:"""
Get user by Auth0 ID.
This is the lookup that bridges external identity (Auth0) to internal
identity (database ID). Called on every authenticated request.
Returns None if user not found (first login).
"""result=awaitdb.execute(select(User).where(User.auth0_id==auth0_id))returnresult.scalar_one_or_none()asyncdefcreate(self,db:AsyncSession,auth0_id:str,email:str,name:str="")->User:"""
Create new user record.
Called during auto-registration when user logs in for the first time.
Database assigns the ID (auto-increment).
"""user=User(auth0_id=auth0_id,email=email,name=name)db.add(user)awaitdb.commit()awaitdb.refresh(user)# Populates user.id from databasereturnuseruser_repo=UserRepository()
Why separate service and repository?
Service layer (user_service.py):
Business logic: “get or create” logic
Decides WHEN to create vs update
Handles profile syncing
Could send welcome emails, trigger analytics, etc.
Repository layer (user_repo.py):
Database operations: SQL queries
HOW to fetch/create/update
No business logic, just CRUD
Easily testable with mocks
Example of layering benefits:
If we want to add “send welcome email on first registration”:
# Service layer (user_service.py)asyncdefget_or_create_from_auth0_id(self,...):user=awaituser_repo.get_by_auth0_id(db,auth0_id)ifnotuser:# NEW: First-time user logicuser=awaituser_repo.create(db,auth0_id,email,name)awaitemail_service.send_welcome_email(user.email)# Business logicawaitanalytics.track("user_registered",user.id)# Business logicreturnuser
Repository stays unchanged — it just does database operations. Business logic lives in the service.
# Test service without hitting databasemock_repo.get_by_auth0_id.return_value=Nonemock_repo.create.return_value=User(id=42,auth0_id="test")# Test that welcome email sentawaituser_service.get_or_create_from_auth0_id(db,"test","a@b.com")email_service.send_welcome_email.assert_called_once()
# backend/app/api/sessions.pyfromfastapiimportAPIRouter,Depends,HTTPException,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.api.dependenciesimportget_sessionfromapp.core.authimportget_current_user_idfromapp.domain.services.user_serviceimportuser_servicefromapp.domain.services.session_serviceimportsession_servicefromapp.domain.dtosimportSessionDTO,SessionCreateDTOrouter=APIRouter(prefix="/api/sessions",tags=["sessions"])@router.post("/",response_model=SessionDTO,status_code=status.HTTP_201_CREATED)asyncdefcreate_session(data:SessionCreateDTO,db:AsyncSession=Depends(get_session),auth0_id:str=Depends(get_current_user_id),):"""
Create new chat session.
Requires: Authorization: Bearer <JWT>
"""# Get user from auth0_iduser=awaituser_service.get_or_create_from_auth0_id(db,auth0_id)try:returnawaitsession_service.create_session(db,user.id,data.title)exceptValueErrorase:raiseHTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))@router.get("/",response_model=list[SessionDTO])asyncdeflist_sessions(skip:int=0,limit:int=50,db:AsyncSession=Depends(get_session),auth0_id:str=Depends(get_current_user_id),):"""List user's sessions."""user=awaituser_service.get_or_create_from_auth0_id(db,auth0_id)returnawaitsession_service.list_user_sessions(db,user.id,skip,limit)@router.get("/{session_id}",response_model=SessionDTO)asyncdefget_session(session_id:int,db:AsyncSession=Depends(get_session),auth0_id:str=Depends(get_current_user_id),):"""Get specific session."""user=awaituser_service.get_or_create_from_auth0_id(db,auth0_id)session=awaitsession_service.get_session(db,session_id,user.id)ifnotsession:raiseHTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="Session not found",)returnsession@router.delete("/{session_id}",status_code=status.HTTP_204_NO_CONTENT)asyncdefdelete_session(session_id:int,db:AsyncSession=Depends(get_session),auth0_id:str=Depends(get_current_user_id),):"""Delete session."""user=awaituser_service.get_or_create_from_auth0_id(db,auth0_id)deleted=awaitsession_service.delete_session(db,session_id,user.id)ifnotdeleted:raiseHTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="Session not found",)
The key change: Every endpoint now has auth0_id: str = Depends(get_current_user_id). This:
Requires Authorization: Bearer <JWT> header
Verifies the JWT signature and claims
Extracts the Auth0 user ID (sub claim)
Passes it to the endpoint
We then use user_service.get_or_create_from_auth0_id() to:
# backend/app/api/messages.pyfromfastapiimportAPIRouter,Depends,HTTPException,Query,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.api.dependenciesimportget_sessionfromapp.core.authimportget_current_user_id# NEW importfromapp.domain.services.message_serviceimportmessage_servicefromapp.domain.services.user_serviceimportuser_service# NEW importfromapp.domain.dtosimportMessageDTO,MessageCreateDTOrouter=APIRouter(prefix="/api/messages",tags=["messages"])@router.post("/",response_model=MessageDTO,status_code=status.HTTP_201_CREATED)asyncdefcreate_message(data:MessageCreateDTO,session_id:int=Query(...),db:AsyncSession=Depends(get_session),auth0_id:str=Depends(get_current_user_id),# NEW: JWT auth):"""
Create message in session.
Requires: Authorization: Bearer <JWT>
"""# NEW: Get user from Auth0 IDuser=awaituser_service.get_or_create_from_auth0_id(db,auth0_id)try:returnawaitmessage_service.create_message(db,session_id,user.id,data)exceptValueErrorase:raiseHTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))exceptPermissionErrorase:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail=str(e))@router.get("/",response_model=list[MessageDTO])asyncdeflist_messages(session_id:int=Query(...),skip:int=0,limit:int=100,db:AsyncSession=Depends(get_session),auth0_id:str=Depends(get_current_user_id),# NEW: JWT auth):"""
List messages in session.
Requires: Authorization: Bearer <JWT>
"""# NEW: Get user from Auth0 IDuser=awaituser_service.get_or_create_from_auth0_id(db,auth0_id)try:returnawaitmessage_service.list_session_messages(db,session_id,user.id,skip,limit)exceptPermissionErrorase:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail=str(e))
Changes:
Added get_current_user_id import and dependency to both endpoints
Added user_service import
Each endpoint now requires JWT authentication
We look up the user by auth0_id before performing operations
What stayed the same:
Business logic in message_service
Permission checks (403 if user tries to access someone else’s session)
# backend/app/api/stream.pyfromfastapiimportAPIRouter,Dependsfromfastapi.responsesimportStreamingResponsefromapp.core.authimportget_user_from_stream_cookierouter=APIRouter(prefix="/stream",tags=["streaming"])@router.get("/")asyncdefstream_events(auth0_id:str=Depends(get_user_from_stream_cookie),):"""
SSE endpoint for agent responses.
Requires stream session cookie (call POST /auth/session first).
"""asyncdefevent_generator():yieldf"data: {{\"type\": \"connected\", \"user\": \"{auth0_id}\"}}\n\n"# Agent streaming will go here in Part 4yieldf"data: {{\"type\": \"message\", \"content\": \"Hello from SSE!\"}}\n\n"returnStreamingResponse(event_generator(),media_type="text/event-stream",headers={"Cache-Control":"no-cache","Connection":"keep-alive","X-Accel-Buffering":"no",},)
// frontend/src/api/sse.ts
import{useAuth0}from'@auth0/auth0-react'constAPI_URL=import.meta.env.VITE_API_URL||'http://localhost:8000'exportfunctionuseSSE() {const{getAccessTokenSilently}=useAuth0()asyncfunctionopenStream(onMessage:(event: MessageEvent)=>void){// Step 1: Get JWT and exchange for cookie
consttoken=awaitgetAccessTokenSilently()awaitfetch(`${API_URL}/auth/session`,{method:'POST',headers:{'Authorization':`Bearer ${token}`,},credentials:'include',// Save the cookie
})// Step 2: Open SSE (cookie sent automatically)
consteventSource=newEventSource(`${API_URL}/stream/`,{withCredentials: true,})eventSource.onmessage=onMessageeventSource.onerror=async(error)=>{console.error('SSE error:',error)eventSource.close()// Could implement auto-reconnect with fresh cookie here
}returneventSource}return{openStream}}
The flow:
Get JWT from Auth0 (in memory)
Call POST /auth/session with JWT -> backend sets cookie
Open EventSource with withCredentials: true -> browser sends cookie
Comments
Comments