Github Badge Co-authored with GPT-5-Pro Co-authored with Claude-4.5-Sonnet

Building a Production-Ready Agent Stack: Part 3 - Authentication & Security

In Parts 1 and 2, we built a solid foundation: Docker containers, FastAPI backend, database with migrations, and clean architecture with repositories and services. But right now, anyone can access anyone’s sessions and messages. Time to fix that.


We have a problem. Actually, we have two different authentication needs:

Need 1: API authentication. We need to protect our REST endpoints (/api/sessions, /api/messages, etc.). Standard bearer token authentication works perfectly here.

Need 2: SSE authentication. We need to authenticate Server-Sent Events for streaming agent responses. But here’s the problem: EventSource (the browser API for SSE) can’t send custom headers. No Authorization: Bearer <token> header allowed.

Most tutorials either punt on the SSE problem (put tokens in query strings—terrible idea) or overcomplicate things with dual auth systems. We’re going to do it right with a hybrid approach:

The Solution:

  1. JWTs for API requests - Standard Authorization: Bearer header
  2. Signed cookies for SSE - Short-lived (5-10 min), stateless, secure
  3. One auth provider - Auth0 handles everything

Here’s the flow:

1
2
3
4
5
6
1. User logs in with Auth0 -> SPA receives JWT
2. SPA makes API requests -> Authorization: Bearer <JWT>
3. Before opening SSE -> POST /auth/session with JWT
4. Backend verifies JWT -> issues short-lived signed cookie
5. SPA opens EventSource -> browser sends cookie automatically
6. Backend verifies cookie signature -> streams events

This approach gives us:

  • Standard OAuth2/OIDC with Auth0 SPA SDK (PKCE flow)
  • JWTs for what they’re good at (API authentication)
  • Cookies for what they’re good at (SSE where headers don’t work)
  • No database lookups for auth (both JWT and cookie are stateless)
  • Short-lived cookies (5-10 min) limit exposure if stolen
  • Simple, standard patterns that scale

By the end of this post, you’ll have:

  • Auth0 SPA integration with the PKCE flow
  • Backend JWT verification with JWKS caching
  • Signed cookie exchange for SSE authentication
  • Protected API endpoints using JWT bearer tokens
  • SSE streaming authenticated with cookies
  • Frontend login/logout with Auth0 React SDK

Let’s build it.

The SSE Authentication Challenge

Before we dive in, let me explain why we need this hybrid approach.

Standard API authentication works perfectly:

1
2
3
4
5
6
7
8
// Frontend makes API request with JWT
const token = await getAccessToken();

fetch('https://api.example.com/sessions', {
  headers: {
    'Authorization': `Bearer ${token}`
  }
})

Backend extracts token from header, verifies signature and claims, accepts or rejects. Simple, standard, secure.

But Server-Sent Events are different:

1
2
// Frontend opens SSE connection
const eventSource = new EventSource('https://api.example.com/stream');

Notice what’s missing? No way to pass headers. The EventSource API doesn’t support custom headers. Period. This isn’t a bug — it’s the W3C spec.

Why Doesn’t EventSource Support Headers?

When SSE was standardized in 2009, it was designed for public event streams (news feeds, stock tickers, sports scores). The assumption was: if you need authentication, handle it at the application level.

That made sense in 2009. In 2025, streaming personalized agent responses? We need auth.

The Bad Solutions

Let me show you what NOT to do:

Bad Solution 1: Token in URL

1
const eventSource = new EventSource(`/stream?token=${jwt}`);

Why this is terrible:

  • Tokens in logs - Your web server, CDN, proxies all log URLs with query params
  • Tokens in browser history - Persisted in the browser database
  • Token leakage - Referer headers can leak tokens to external sites
  • Security violation - RFC 6750 explicitly forbids tokens in URLs

Bad Solution 2: Polling instead of streaming

1
2
3
setInterval(() => {
  fetch('/messages', { headers: { 'Authorization': `Bearer ${token}` }});
}, 1000);

Why this is terrible:

  • Latency - Users wait up to 1 second for updates
  • Server load - 100 users = 100 req/sec, mostly returning “no data”
  • Battery drain - Constant requests kill mobile battery
  • Throws away SSE benefits - Instant push, auto-reconnect, HTTP/2 multiplexing

Bad Solution 3: WebSockets

The native WebSocket API doesn’t let you set headers either. You’d need Socket.IO or similar, adding:

  • 30KB+ JavaScript library
  • Server library with custom protocol
  • Bidirectional complexity when you only need server->client
  • Reinventing SSE with extra steps

The Right Solution: Cookies for SSE Only

Cookies are sent automatically by browsers. We leverage this for SSE while keeping JWTs for API requests.

Why this works:

  • JWTs for API - Standard Authorization header, stateless, works everywhere
  • Cookies for SSE - Only place where headers don’t work
  • Both are stateless - Cookie contains signed user data, no database lookup
  • Short-lived cookie - 5-10 min limits damage if stolen
  • HttpOnly, Secure, SameSite - All the cookie security flags
  • Simple - One auth provider (Auth0), clean separation of concerns

Note

Why not cookies for everything?

You could use cookies for API requests too (like I almost did in an earlier version). But JWTs in Authorization headers are:

  • Standard - Every API client knows how to send headers
  • Debuggable - Easy to inspect with curl/Postman
  • CORS-friendly - No credentials complexity
  • Mobile-ready - Mobile apps handle headers easily, cookies not so much

Use the right tool for the job. JWTs for API, cookies for SSE where EventSource forces our hand.

Setting Up Auth0: The Identity Layer

Rolling your own auth in 2025 is a bad idea. You need:

  • Password hashing, strength validation, reset flows
  • Email verification with token expiration
  • Rate limiting and brute force protection
  • Multi-factor authentication (TOTP, SMS, push)
  • Social logins (Google, GitHub, Apple)
  • Account lockout and security headers
  • Audit logs and compliance (GDPR, CCPA)

Auth0 handles all of this. Battle-tested, maintained by security experts, free tier up to 25,000 monthly active users.

Why Auth0?

  • Mature (founded 2013, acquired by Okta 2021)
  • Excellent documentation with real examples
  • Standard OAuth2/OIDC (portable to other providers)
  • Great developer experience
  • Generous free tier

The patterns we build today work with any OAuth2 provider (Firebase, Supabase, Clerk, Keycloak, etc.). Swap Auth0 later if needed.

Let’s get it started!

Creating Your Auth0 Tenant

Step 1: Sign up

Go to auth0.com and create an account.

Choose a tenant domain:

1
https://your-tenant-name.us.auth0.com

Pick something professional (company or project name). You can’t change it later without creating a new tenant.

Region: Choose closest to your users (US, EU, AU, Japan).

Step 2: Create a Single Page Application

In Auth0 dashboard: Applications -> Applications -> Create Application

  • Name: “Agent Stack Frontend”
  • Application Type: Single Page Web Applications

This is key. We’re using the SPA pattern (public client with PKCE), not the confidential client pattern.

Why SPA?

  • Frontend handles OAuth flow directly
  • No backend callback needed
  • Standard pattern with excellent Auth0 SDK support
  • PKCE (Proof Key for Code Exchange) prevents code interception

Click Create.

Step 3: Configure Application Settings

Go to Settings tab:

Application URIs:

  • Allowed Callback URLs:

    1
    
    http://localhost:5173, http://localhost:3000
    

    Where Auth0 redirects after login. Add production domain later: https://app.yourdomain.com

  • Allowed Logout URLs:

    1
    
    http://localhost:5173, http://localhost:3000
    

    Where Auth0 redirects after logout.

  • Allowed Web Origins:

    1
    
    http://localhost:5173, http://localhost:3000
    

    For CORS during token refresh.

Note: Port 5173 is Vite’s default, 3000 is Create React App’s default. Include both.

Click Save Changes.

Step 4: Configure Advanced Settings

Scroll to Advanced Settings -> Grant Types:

Enable:

  • Authorization Code (PKCE flow)
  • Refresh Token (stay logged in)

Disable:

  • Implicit (deprecated, insecure)
  • Password (deprecated)

Save Changes.

Step 5: Get Application Credentials

From the Settings tab, copy:

  • Domain (e.g., your-tenant.us.auth0.com)
  • Client ID (long alphanumeric string)

You’ll need these for frontend .env.

Note

SPAs don’t have a client secret (they’re public clients). The secret stays in the Auth0 dashboard.

Creating an API

We need to tell Auth0 about our backend API so JWTs have the correct audience claim.

Applications -> APIs -> Create API

  • Name: “Agent Stack API”
  • Identifier: https://api.agent-stack.com (can be any URL, doesn’t need to be real — it’s just a unique identifier for the aud claim)
  • Signing Algorithm: RS256

Click Create.

Important settings:

  • Identifier: This becomes the audience in JWTs. Copy it for backend .env as AUTH0_AUDIENCE.
  • Token Expiration: Default 86400 seconds (24 hours). Fine for now.

Environment Variables

Frontend (frontend/.env):

1
2
3
4
VITE_AUTH0_DOMAIN=your-tenant.us.auth0.com
VITE_AUTH0_CLIENT_ID=your-client-id-from-dashboard
VITE_AUTH0_AUDIENCE=https://api.agent-stack.com
VITE_API_URL=http://localhost:8000

Backend (backend/.env):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Auth0 Configuration
AUTH0_DOMAIN=your-tenant.us.auth0.com
AUTH0_AUDIENCE=https://api.agent-stack.com
AUTH0_ISSUER=https://your-tenant.us.auth0.com/

# Cookie signing secret (generate with: python -c "import secrets; print(secrets.token_urlsafe(32))")
COOKIE_SECRET=your-super-secret-random-string-here

# Session cookie config
SESSION_COOKIE_NAME=stream_session
SESSION_COOKIE_MAX_AGE=600  # 10 minutes

Generating COOKIE_SECRET:

1
python -c "import secrets; print(secrets.token_urlsafe(32))"

This secret signs cookies. If leaked, attackers can forge cookies. Keep it secret, rotate it periodically.

SESSION_COOKIE_NAME and SESSION_COOKIE_MAX_AGE control the cookie name and expiration time. There is no default; set as needed. Like 10 minutes (600 seconds) is a good balance for time and the name can be anything like stream_session_cookie.

Warning

Never commit secrets to git.

Add .env to .gitignore:

1
2
3
4
# .gitignore
.env
.env.local
.env.*.local

Create .env.example with placeholders:

1
2
3
4
# .env.example
AUTH0_DOMAIN=your-tenant.us.auth0.com
AUTH0_AUDIENCE=https://api.your-app.com
COOKIE_SECRET=generate-with-secrets-token-urlsafe

Understanding PKCE (The SPA Security Model)

PKCE (Proof Key for Code Exchange, pronounced “pixie”) prevents authorization code interception in public clients.

Why we need it:

Traditional OAuth2 used client secrets to prevent code interception. But SPAs can’t keep secrets — JavaScript runs in the browser, visible to users. An attacker could:

  1. Intercept the authorization code during the redirect (network sniffer, compromised proxy)
  2. Exchange it for tokens at the token endpoint
  3. Impersonate the user

PKCE solves this without secrets using one-way cryptographic hashing.

The flow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1. Frontend generates random code_verifier (128 chars of entropy)
   Example: "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"

2. Frontend hashes it: code_challenge = BASE64URL(SHA256(code_verifier))
   SHA256 is one-way: you can't reverse the hash to get the original
   Example challenge: "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM"

3. Frontend -> Auth0: authorize request with code_challenge
   URL: /authorize?code_challenge=E9Melhoa...&code_challenge_method=S256

4. User logs in on Auth0's page
   Auth0 stores the code_challenge tied to the authorization code

5. Auth0 -> Frontend: redirect with authorization code
   URL: yourapp.com/callback?code=abc123
   Attacker could intercept this code!

6. Frontend -> Auth0: exchange code + original code_verifier for tokens
   POST /oauth/token
   {
     "code": "abc123",
     "code_verifier": "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"
   }

7. Auth0 verifies: SHA256(code_verifier) == stored code_challenge
   Hashes the provided verifier and compares to stored challenge
   If match: the request came from the same client that started auth

8. Auth0 -> Frontend: issues access_token (JWT)

Why this is secure:

Cryptographic one-way property:

  • SHA256 is a one-way hash function (also called a “cryptographic hash”)
  • Easy to compute: hash = SHA256(input) takes microseconds
  • Impossible to reverse: Given the hash, you can’t compute the original input
  • Even knowing a billion hashes won’t help you reverse a single one

Attack scenario:

Let’s say an attacker intercepts the authorization code (step 5):

1
Attacker sees: code=abc123 in the redirect URL

The attacker tries to exchange it:

1
2
3
4
5
POST /oauth/token
{
  "code": "abc123",
  "code_verifier": "???"  // What to put here?
}

The attacker doesn’t have the code_verifier (it never left the frontend’s memory).

Could they use the code_challenge they saw in step 3?

1
2
3
4
5
POST /oauth/token
{
  "code": "abc123",
  "code_verifier": "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM"  // Using challenge
}

No! Auth0 hashes this and gets:

1
2
3
SHA256("E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM")
= "a7b3c2..." (different hash!)
≠ "E9Melhoa..." (stored challenge)

Verification fails. Token denied.

Could they reverse the hash to get the original code_verifier?

No. This is the core security property of SHA256. Even with:

  • The hash value
  • Unlimited computing power for years
  • Knowledge of the algorithm

You cannot reverse a SHA256 hash. The mathematical function discards information (many inputs map to each output — called “collisions” in theory, but finding one is computationally infeasible).

Why 128 characters of randomness?

The code_verifier must be high-entropy (unpredictable). With 128 random characters from the set [A-Za-z0-9-._~]:

  • 66^128 possible values ≈ 2^760
  • Even if you tried 1 trillion guesses per second for the age of the universe, you’d have 0% chance of guessing it

This is why SPAs don’t need client secrets — PKCE provides equivalent security through cryptographic one-way functions.

Note

Auth0 React SDK handles PKCE automatically. You’ll never see code_verifier or code_challenge in your code — the SDK manages it. But understanding the cryptographic primitives (one-way hashing, entropy) helps you reason about the security of your auth flow.

Backend: JWT Verification

Now let’s build the backend authentication layer. We need to verify JWTs from Auth0.

Understanding JWTs: Signing vs Encryption

A JWT has three parts separated by dots: header.payload.signature

Let’s decode a real JWT to understand what’s inside. Here’s an example:

1
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6ImFiYzEyMyJ9.eyJpc3MiOiJodHRwczovL3lvdXItdGVuYW50LnVzLmF1dGgwLmNvbS8iLCJzdWIiOiJhdXRoMHw1MDdmMWY3N2JjZjg2Y2Q3OTk0MzkwMTEiLCJhdWQiOiJodHRwczovL2FwaS5hZ2VudC1zdGFjay5jb20iLCJleHAiOjE3MzU1NzQ0MDAsImlhdCI6MTczNTU3MDgwMCwiZW1haWwiOiJ1c2VyQGV4YW1wbGUuY29tIn0.signature_bytes_here

Part 1: Header (base64url-encoded JSON)

1
2
3
4
5
{
  "alg": "RS256",  // RSA signature with SHA-256 hashing
  "typ": "JWT",    // This is a JSON Web Token
  "kid": "abc123"  // Key ID: which public key to use for verification
}

Base64url encoded: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6ImFiYzEyMyJ9

Part 2: Payload (base64url-encoded JSON - not encrypted, just encoded)

1
2
3
4
5
6
7
8
{
  "iss": "https://your-tenant.us.auth0.com/",  // Issuer: who created this token
  "sub": "auth0|507f1f77bcf86cd799439011",     // Subject: the user's ID
  "aud": "https://api.agent-stack.com",        // Audience: who should accept this token
  "exp": 1735574400,                           // Expiration: Unix timestamp
  "iat": 1735570800,                           // Issued at: Unix timestamp
  "email": "user@example.com"                  // Custom claims (any data)
}

Base64url encoded: eyJpc3MiOiJodHRwczovL3lvdXItdGVuYW50LnVzLmF1dGgwLmNvbS8iLCJzdWIiOiJhdXRoMHw1MDdmMWY3N2JjZjg2Y2Q3OTk0MzkwMTEiLCJhdWQiOiJodHRwczovL2FwaS5hZ2VudC1zdGFjay5jb20iLCJleHAiOjE3MzU1NzQ0MDAsImlhdCI6MTczNTU3MDgwMCwiZW1haWwiOiJ1c2VyQGV4YW1wbGUuY29tIn0

Important: The payload is not encrypted. Anyone can decode base64 and read it:

1
2
echo "eyJpc3MiOiJodHRwczovL3lvdXItdGVuYW50..." | base64 -d
# Returns the JSON payload

This is intentional! JWTs are about authentication (proving who you are), not confidentiality (hiding data). Never put secrets in the payload.

Part 3: Signature (RSA signature bytes, base64url-encoded)

This is where the security happens. The signature proves:

  1. The token was created by Auth0 (only they have the private key)
  2. The payload hasn’t been tampered with (changing one character breaks the signature)

Here’s how it works:

Creating the signature (Auth0’s side):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Step 1: Create the message to sign
message = base64url(header) + "." + base64url(payload)
# "eyJhbGci...J9.eyJpc3Mi...fQ"

# Step 2: Hash the message with SHA-256
hash = SHA256(message)
# SHA-256 reduces any input to a fixed 256-bit (32-byte) digest
# Example: "3a7b8c9d..." (64 hex characters)

# Step 3: Sign the hash with RSA private key (only Auth0 has this)
signature = RSA_SIGN(hash, auth0_private_key)
# RSA encryption of the hash using Auth0's secret private key
# Creates ~256 bytes of signature data

# Step 4: Encode signature as base64url
token = message + "." + base64url(signature)

Verifying the signature (our backend):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Step 1: Split token into parts
header, payload, signature = token.split(".")

# Step 2: Fetch Auth0's public key (from JWKS)
public_key = get_auth0_public_key(kid="abc123")
# Public key is mathematically related to private key
# Can verify signatures but CANNOT create them

# Step 3: Recreate the message
message = header + "." + payload

# Step 4: Hash the message (same as Auth0 did)
hash = SHA256(message)

# Step 5: Verify signature with RSA public key
is_valid = RSA_VERIFY(hash, signature, public_key)
# RSA decrypts the signature using the public key
# Compares decrypted value to our computed hash
# If they match: signature is valid, token is authentic

Why RSA (asymmetric cryptography)?

RSA uses a key pair:

  • Private key: Can create signatures (only Auth0 has this)
  • Public key: Can verify signatures (anyone can have this)

This is different from symmetric algorithms like HS256 (HMAC-SHA256):

1
2
3
4
5
6
7
8
HS256: same secret for signing AND verifying
  signature = HMAC-SHA256(message, shared_secret)
  Problem: if we verify tokens, we can also create fake ones

RS256: different keys for signing vs verifying
  signature = RSA_SIGN(hash, private_key)    // Auth0 only
  verify = RSA_VERIFY(hash, signature, public_key)  // Anyone
  Benefit: we can verify but can't forge

The mathematics: RSA is based on the difficulty of factoring large prime numbers:

  • Private key: two large primes (p, q) and derived values
  • Public key: the product n = p × q and an exponent e
  • Signing: compute signature^d ≡ hash (mod n), where d is from private key
  • Verifying: compute signature^e ≡ hash (mod n), where e is from public key
  • Security: knowing (n, e) doesn’t let you compute d without factoring n into p × q (computationally infeasible for 2048-bit keys)

Why base64url encoding?

JWTs must be URL-safe (passed in headers, query strings). Base64url uses:

  • A-Z, a-z, 0-9, -, _ (URL-safe characters)
  • No padding = characters
  • Standard base64 uses +, /, = which break URLs

Why hash before signing?

RSA can only encrypt data smaller than the key size (typically 2048 bits = 256 bytes). But tokens can be larger. SHA-256 always produces a fixed 32-byte hash, regardless of input size. We sign the hash, not the full message.

Can’t someone modify the payload and re-sign it?

No. They’d need Auth0’s private key to create a valid signature. Without it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Attacker modifies payload:
{
  "sub": "auth0|attacker",  // Changed user ID
  "email": "attacker@example.com"
}

Attacker tries to sign with their own key:
signature = RSA_SIGN(hash, attacker_private_key)

Our verification fails:
RSA_VERIFY(hash, signature, auth0_public_key)  // FALSE
// The public key doesn't match the private key used for signing

The mathematical relationship between Auth0’s public and private keys ensures this.

What we verify:

When jwt.decode() succeeds, we know:

  1. Authenticity: Token was created by Auth0 (signature valid with Auth0’s public key)
  2. Integrity: Payload hasn’t been modified (signature covers entire message)
  3. Authorization: Token is for our API (audience claim matches)
  4. Not expired: Current time < expiration timestamp
  5. From correct issuer: Issuer claim matches Auth0 tenant

We trust the payload claims because the signature proves no one tampered with them.

Why RS256 instead of HS256?

FeatureRS256 (RSA)HS256 (HMAC)
KeysPublic/private pairSingle shared secret
Who can verify?Anyone with public keyOnly those with secret
Who can sign?Only holder of private keyAnyone with secret
Use caseAuth0 signs, multiple services verifySingle service signs and verifies
SecurityCan’t forge signatures without private keyIf secret leaks, attacker can forge
Key rotationPublish new public key, no secret sharingMust securely share new secret

For our architecture (Auth0 signs, we verify), RS256 is the right choice.

Installing Dependencies

1
2
cd backend
uv add "python-jose[cryptography]" itsdangerous

What these do:

  • python-jose: JWT decoding and signature verification
  • cryptography: Cryptographic primitives for RS256
  • itsdangerous: Secure cookie signing (for the SSE cookie)

Creating the Auth Module

Finally, let’s implement the auth logic.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# backend/app/core/auth.py
from datetime import datetime, timedelta
from typing import Any, Optional

from fastapi import HTTPException, Security, Request, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
import httpx

from app.core.settings import settings


# Security scheme for Swagger UI
security = HTTPBearer()

# JWKS cache (JSON Web Key Set from Auth0)
_jwks_cache: Optional[dict[str, Any]] = None
_jwks_cache_time: Optional[datetime] = None
JWKS_CACHE_TTL = timedelta(hours=1)


async def get_jwks() -> dict[str, Any]:
    """
    Fetch Auth0's public keys for JWT signature verification.

    Auth0 rotates keys periodically, so we cache for 1 hour.
    """
    global _jwks_cache, _jwks_cache_time

    # Return cached JWKS if still valid
    if _jwks_cache and _jwks_cache_time:
        if datetime.utcnow() - _jwks_cache_time < JWKS_CACHE_TTL:
            return _jwks_cache

    # Fetch fresh JWKS
    jwks_url = f"https://{settings.auth0_domain}/.well-known/jwks.json"

    async with httpx.AsyncClient() as client:
        response = await client.get(jwks_url)
        response.raise_for_status()

    _jwks_cache = response.json()
    _jwks_cache_time = datetime.utcnow()

    return _jwks_cache


async def verify_jwt(token: str) -> dict[str, Any]:
    """
    Verify JWT signature and claims.

    Returns payload if valid, raises HTTPException if invalid.
    """
    try:
        # Fetch JWKS (cached)
        jwks = await get_jwks()

        # Decode token header to get key ID
        unverified_header = jwt.get_unverified_header(token)
        key_id = unverified_header.get("kid")

        if not key_id:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token missing 'kid' (key ID)",
            )

        # Find matching public key
        rsa_key = None
        for key in jwks.get("keys", []):
            if key["kid"] == key_id:
                rsa_key = {
                    "kty": key["kty"],
                    "kid": key["kid"],
                    "use": key["use"],
                    "n": key["n"],
                    "e": key["e"],
                }
                break

        if not rsa_key:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Unable to find matching public key",
            )

        # Verify signature and claims
        payload = jwt.decode(
            token,
            rsa_key,
            algorithms=["RS256"],
            audience=settings.auth0_audience,
            issuer=settings.auth0_issuer,
        )

        return payload

    except ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired",
        )
    except JWTClaimsError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid token claims: {str(e)}",
        )
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid token: {str(e)}",
        )


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> dict[str, Any]:
    """
    FastAPI dependency for extracting user from JWT.

    Usage:
        @app.get("/protected")
        async def protected(user: dict = Depends(get_current_user)):
            user_id = user["sub"]
    """
    token = credentials.credentials
    return await verify_jwt(token)


async def get_current_user_id(
    user: dict[str, Any] = Security(get_current_user),
) -> str:
    """
    Extract user ID from verified JWT.

    Auth0 user IDs look like: "auth0|507f1f77bcf86cd799439011"
    """
    return user["sub"]


# Cookie signer for SSE authentication
cookie_signer = URLSafeTimedSerializer(
    settings.cookie_secret,
    salt="stream-session"
)


def create_stream_cookie(user_id: str) -> str:
    """
    Create signed cookie for SSE authentication.

    Cookie contains user_id and expiration. Stateless—no database lookup.
    """
    return cookie_signer.dumps(user_id)


def verify_stream_cookie(cookie_value: str) -> str:
    """
    Verify signed cookie and extract user_id.

    Raises BadSignature if cookie tampered with.
    Raises SignatureExpired if cookie too old.
    """
    try:
        # max_age in seconds
        user_id = cookie_signer.loads(
            cookie_value,
            max_age=settings.session_cookie_max_age
        )
        return user_id
    except SignatureExpired:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Stream session expired. Refresh and reconnect.",
        )
    except BadSignature:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid stream session cookie",
        )


async def get_user_from_stream_cookie(request: Request) -> str:
    """
    FastAPI dependency for SSE endpoints.

    Extracts and verifies the stream session cookie.
    Returns user_id if valid.
    """
    cookie_value = request.cookies.get(settings.session_cookie_name)

    if not cookie_value:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="No stream session. Call POST /auth/session first.",
        )

    return verify_stream_cookie(cookie_value)

Let’s break down what’s happening:

1. JWKS fetching and caching:

1
jwks_url = f"https://{settings.auth0_domain}/.well-known/jwks.json"

Auth0 publishes public keys at this standard endpoint. Response:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
  "keys": [
    {
      "kty": "RSA",
      "kid": "abc123",
      "use": "sig",
      "n": "0vx7agoebGcQSuu...",  // RSA modulus
      "e": "AQAB"                  // RSA exponent
    }
  ]
}

We cache this for 1 hour. Auth0 rotates keys occasionally (months apart), and publishes new keys before removing old ones (overlap period for graceful transition).

2. Token verification:

1
2
3
4
5
6
7
payload = jwt.decode(
    token,
    rsa_key,
    algorithms=["RS256"],
    audience=settings.auth0_audience,
    issuer=settings.auth0_issuer,
)

python-jose verifies:

  • Signature: Token signed by Auth0’s private key?
  • Audience: Token meant for our API?
  • Issuer: Token issued by our Auth0 tenant?
  • Expiration: Token still valid?

If any check fails, we raise 401 Unauthorized.

3. Cookie signing with HMAC (for SSE):

1
2
3
4
cookie_signer = URLSafeTimedSerializer(
    settings.cookie_secret,
    salt="stream-session"
)

itsdangerous signs cookies with HMAC-SHA256 (Hash-based Message Authentication Code). Let’s understand what this means and why it’s secure.

What is HMAC?

HMAC is a way to verify both authenticity (who created it) and integrity (hasn’t been tampered with) using a shared secret.

Here’s how it works:

Creating a signed cookie (our backend):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# The data we want to sign
user_id = "auth0|507f1f77bcf86cd799439011"
timestamp = 1735574400  # Current Unix timestamp

# Step 1: Serialize the data
payload = f"{user_id}.{timestamp}"
# "auth0|507f1f77bcf86cd799439011.1735574400"

# Step 2: Compute HMAC signature
signature = HMAC-SHA256(
    message=payload,
    key=COOKIE_SECRET,
    salt="stream-session"  # Domain separation (prevents misuse of signature)
)
# HMAC is: hash(key + hash(key + message))
# With salt: hash(key + salt + hash(key + salt + message))
# Output: "a3f8c9e2..." (32 bytes, 64 hex chars)

# Step 3: Combine payload + signature
signed_cookie = base64url(payload + "." + signature)
# Final cookie: "YXV0aDB8NTA3ZjFmNzdiY2Y4NmNkNzk5NDM5MDExLjE3MzU1NzQ0MDAuYTNmOGM5ZTI..."

Why HMAC instead of plain SHA256?

You might think: why not just SHA256(payload)? Because anyone could compute that:

1
2
3
4
5
6
7
# Attacker's attempt without HMAC
attacker_payload = "auth0|attacker.1735574400"
attacker_hash = SHA256(attacker_payload)
forged_cookie = attacker_payload + "." + attacker_hash

# Our backend would accept this!
# SHA256(payload) == attacker_hash  ✓ (matches)

HMAC requires the secret key:

1
2
3
4
5
6
7
# Attacker tries to forge
attacker_payload = "auth0|attacker.1735574400"
attacker_signature = HMAC-SHA256(attacker_payload, "wrong-secret")

# Our backend verification:
our_signature = HMAC-SHA256(attacker_payload, COOKIE_SECRET)
# attacker_signature ≠ our_signature  ✗ (verification fails)

Without COOKIE_SECRET, the attacker can’t create a valid signature.

The HMAC algorithm (simplified):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def HMAC_SHA256(message, key, salt=""):
    # Step 1: Derive keys from secret
    # XOR key with ipad (0x36 repeated) and opad (0x5c repeated)
    key_inner = key ^ 0x3636363636...  # Inner padding
    key_outer = key ^ 0x5c5c5c5c5c...  # Outer padding

    # Step 2: Inner hash
    inner = SHA256(key_inner + salt + message)
    # Combines key with message, hashes it

    # Step 3: Outer hash
    outer = SHA256(key_outer + inner)
    # Hashes the result again with a different key variant

    return outer

Why two hashes?

This construction (nested hashing) prevents length extension attacks:

  • Plain SHA256 has a weakness: if you know hash(A), you can compute hash(A + B) without knowing A
  • HMAC’s double-hash construction makes this impossible
  • Even if you know HMAC(message), you can’t compute HMAC(message + extra) without the key

Verifying the cookie (our backend):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Step 1: Split cookie into payload + signature
parts = cookie_value.split(".")
if len(parts) != 3:
    raise BadSignature("Invalid cookie format")

user_id, timestamp, provided_signature = parts

# Step 2: Recompute signature with our secret
payload = f"{user_id}.{timestamp}"
expected_signature = HMAC-SHA256(
    message=payload,
    key=COOKIE_SECRET,
    salt="stream-session"
)

# Step 3: Compare signatures (constant-time to prevent timing attacks)
if not constant_time_compare(provided_signature, expected_signature):
    raise BadSignature("Signature mismatch - cookie tampered with")

# Step 4: Check expiration
current_time = time.time()
if current_time - timestamp > SESSION_COOKIE_MAX_AGE:
    raise SignatureExpired("Cookie expired")

# Step 5: Return user_id
return user_id

Why constant-time comparison?

Regular string comparison == short-circuits (stops at first different character):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Timing attack vulnerability
def insecure_compare(a, b):
    for i in range(len(a)):
        if a[i] != b[i]:
            return False  # Returns faster if first char differs
    return True

# Attacker measures response time to guess signature byte-by-byte
# Correct first byte: slower (continues to check second byte)
# Wrong first byte: faster (returns immediately)

Constant-time comparison prevents this:

1
2
3
4
5
6
7
8
9
def constant_time_compare(a, b):
    if len(a) != len(b):
        return False

    result = 0
    for x, y in zip(a, b):
        result |= ord(x) ^ ord(y)  # XOR, accumulate differences

    return result == 0  # Always checks all bytes

Takes the same time regardless of where differences occur. Attacker can’t learn anything from timing.

What’s in the cookie?

1
2
3
4
5
6
cookie = base64url(user_id + "." + timestamp + "." + signature)

Decoded example:
user_id:   "auth0|507f1f77bcf86cd799439011"
timestamp: "1735574400"
signature: "a3f8c9e2d4b5a1f3e9c8d7a6b5f4e3d2c1b0a9f8e7d6c5b4a3f2e1d0c9b8a7f6"

The cookie is self-contained:

  • No database lookup to check validity
  • No session store (Redis/Memcached)
  • Signature proves authenticity
  • Timestamp proves freshness

Why add a salt?

The salt="stream-session" parameter provides domain separation:

1
2
3
4
5
6
7
8
# Without salt, an attacker could reuse signatures
cookie_signature = HMAC-SHA256(user_id, COOKIE_SECRET)
# This signature is valid for ANY context

# With salt, signatures are context-specific
stream_cookie_sig = HMAC-SHA256(user_id, COOKIE_SECRET, salt="stream-session")
admin_cookie_sig = HMAC-SHA256(user_id, COOKIE_SECRET, salt="admin-session")
# stream_cookie_sig ≠ admin_cookie_sig

Even with the same user_id and secret, different contexts produce different signatures. An attacker can’t steal a stream cookie and use it for admin access.

Signing vs Encryption:

PropertyHMAC SigningEncryption (AES)
PurposeProve authenticity & integrityHide data
Readable?Yes (payload in plaintext)No (ciphertext unreadable)
Can modify?No (breaks signature)No (breaks decryption)
PerformanceFast (one-way hash)Slower (symmetric cipher)
Use casePublic data that must not be tamperedSecret data that must not be read

For cookies containing user IDs (not secret), signing is perfect. We don’t care if someone reads the user ID—we care that they can’t forge a different one.

Security properties:

  1. Authenticity: Only someone with COOKIE_SECRET can create valid signatures
  2. Integrity: Modifying any part of the payload changes the signature
  3. Freshness: Timestamp ensures cookies expire (can’t reuse old ones)
  4. Non-repudiation: Signature proves we issued this cookie
  5. Domain separation: Salt prevents signature reuse across contexts

No database lookup needed—the cookie itself is cryptographic proof of authentication.

4. FastAPI dependencies:

1
2
3
4
async def get_current_user_id(
    user: dict[str, Any] = Security(get_current_user),
) -> str:
    return user["sub"]

When you add user_id: str = Depends(get_current_user_id) to an endpoint:

  1. FastAPI extracts Authorization: Bearer <token> header
  2. Calls get_current_user() -> verifies JWT
  3. Calls get_current_user_id() -> extracts user ID
  4. Passes user ID to your endpoint
  5. Returns 401 if any step fails

Your endpoint never touches auth logic.

Updating Settings

We need to add Auth0 and cookie configuration to our existing Settings class. Keep all your existing database, CORS, and environment settings — we’re just adding new fields.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# backend/app/core/settings.py
from pydantic import field_validator
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    # Existing fields from Parts 1 & 2:
    # - database_url: str
    # - env: str
    # - cors_origins: list[str]
    # - etc.
    # (keep all of these unchanged)

    # NEW: Auth0 configuration
    auth0_domain: str
    auth0_audience: str
    auth0_issuer: str | None = None

    # NEW: Cookie signing for SSE
    cookie_secret: str
    session_cookie_name: str = "stream_session"
    session_cookie_max_age: int = 600  # 10 minutes

    # NEW: Validators for auth fields
    @field_validator("auth0_domain")
    @classmethod
    def validate_auth0_domain(cls, v: str) -> str:
        """Remove https:// if present."""
        return v.replace("https://", "").replace("http://", "")

    @field_validator("auth0_issuer", mode="before")
    @classmethod
    def set_auth0_issuer(cls, v: str | None, info) -> str:
        """Default issuer from domain."""
        if v:
            return v
        domain = info.data.get("auth0_domain")
        if domain:
            return f"https://{domain}/"
        raise ValueError("Cannot determine AUTH0_ISSUER")

    @field_validator("cookie_secret")
    @classmethod
    def validate_cookie_secret(cls, v: str, info) -> str:
        """Ensure secret is strong in production."""
        if not v:
            raise ValueError("COOKIE_SECRET is required")
        if info.data.get("env") == "prod" and len(v) < 32:
            raise ValueError(
                "COOKIE_SECRET must be at least 32 characters in production"
            )
        return v

    # NEW: Property for cookie security flag
    @property
    def session_cookie_secure(self) -> bool:
        """Cookies only over HTTPS in production."""
        return self.is_production


settings = Settings()

What changed:

  • Added 3 Auth0 config fields: auth0_domain, auth0_audience, auth0_issuer
  • Added 3 cookie config fields: cookie_secret, session_cookie_name, session_cookie_max_age
  • Added validators to sanitize Auth0 domain and enforce secret strength
  • Added session_cookie_secure property to enable Secure flag in production

What stayed the same:

  • All your existing database, CORS, and environment configuration
  • The existing validators and properties you had

We need two endpoints:

  1. POST /auth/session - Exchange JWT for SSE cookie (the trick that makes SSE auth work)
  2. GET /auth/me - Get user profile (nice-to-have for frontend)

Let’s build them and understand why each line matters.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# backend/app/api/auth.py
from fastapi import APIRouter, Depends, Response, status
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.auth import (
    get_current_user,
    get_current_user_id,
    create_stream_cookie,
)
from app.core.settings import settings
from app.api.dependencies import get_session
from app.domain.services.user_service import user_service

router = APIRouter(prefix="/auth", tags=["authentication"])


@router.post("/session", status_code=status.HTTP_204_NO_CONTENT)
async def create_session_cookie(
    response: Response,
    db: AsyncSession = Depends(get_session),
    user_id: str = Depends(get_current_user_id),
):
    """
    Exchange JWT for stream session cookie.

    This is the magic endpoint that bridges JWT authentication (works for API)
    to cookie authentication (works for SSE).

    Frontend flow:
    1. Get JWT from Auth0: const token = await getAccessTokenSilently()
    2. Call this endpoint with Authorization: Bearer <token>
    3. Backend verifies JWT and sets cookie
    4. Open EventSource - browser sends cookie automatically

    Returns: 204 No Content (cookie is in Set-Cookie header)
    """
    # Ensure user exists in database (auto-register if first login)
    user = await user_service.get_or_create_from_auth0_id(db, user_id)

    # Create signed cookie containing user_id
    # Uses HMAC to prevent tampering, includes timestamp for expiration
    cookie_value = create_stream_cookie(user_id)

    # Set cookie with security flags
    response.set_cookie(
        key=settings.session_cookie_name,        # Name: "stream_session"
        value=cookie_value,                      # Signed: "user_id.timestamp.signature"
        max_age=settings.session_cookie_max_age, # Lifetime: 600 seconds (10 min)
        httponly=True,                           # JS can't read it (XSS protection)
        secure=settings.session_cookie_secure,   # HTTPS only in production
        samesite="lax",                          # CSRF protection
    )
    # 204 response = success, no body, cookie in header


@router.get("/me")
async def get_current_user_info(
    user_data: dict = Depends(get_current_user),
    db: AsyncSession = Depends(get_session),
):
    """
    Get authenticated user's profile.

    Useful for frontend to display user info in the UI (avatar, name, email).
    Also auto-registers user on first request.

    Returns: User profile with database ID and Auth0 data
    """
    # get_current_user verifies JWT and returns full payload
    # We extract claims and sync with database
    user = await user_service.get_or_create_from_auth0_id(
        db,
        auth0_id=user_data["sub"],           # "auth0|123" or "google-oauth2|456"
        email=user_data.get("email", ""),    # Optional (privacy settings)
        name=user_data.get("name", ""),      # Optional
    )

    return {
        "id": user.id,                       # Database ID (internal)
        "auth0_id": user.auth0_id,           # Auth0 ID (external)
        "email": user.email,
        "name": user.name,
        "created_at": user.created_at.isoformat(),
    }

Why do we need this endpoint?

1
2
3
4
Problem: EventSource can't send Authorization header
Solution: EventSource automatically sends cookies
Challenge: How do we get a cookie?
Answer: Exchange JWT for cookie via API request (which CAN send headers)

The flow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Frontend before opening SSE

// Step 1: Get JWT from Auth0 SDK (in memory)
const token = await getAccessTokenSilently()

// Step 2: Exchange JWT for cookie
await fetch('https://api.example.com/auth/session', {
  method: 'POST',
  headers: {
    'Authorization': `Bearer ${token}`  // JWT in header (works for fetch)
  },
  credentials: 'include'  // Tell browser to save Set-Cookie header
})
// Backend verifies JWT, issues cookie
// Browser receives Set-Cookie header, stores cookie

// Step 3: Open SSE (cookie sent automatically)
const eventSource = new EventSource('https://api.example.com/stream/', {
  withCredentials: true  // Tell browser to send cookies
})
// Browser includes Cookie header automatically
// Backend verifies cookie signature, streams events

Why credentials: 'include'?

By default, browsers don’t send or save cookies for cross-origin requests (CORS). The credentials: 'include' flag says:

  • “Save cookies from this response (Set-Cookie header)”
  • “Send cookies with future requests to this origin”

This works because our backend CORS config allows it:

1
2
3
4
5
6
7
8
# backend/app/main.py
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173"],
    allow_credentials=True,  # Allow cookies in CORS requests
    allow_methods=["*"],
    allow_headers=["*"],
)

Without allow_credentials=True, browsers reject the Set-Cookie header for security.

Let’s understand each flag in response.set_cookie():

httponly=True - JavaScript can’t access it

1
2
3
4
5
6
7
// Without HttpOnly:
document.cookie  // "stream_session=abc123..."
// XSS attack can steal cookie and send to attacker's server

// With HttpOnly:
document.cookie  // "" (cookie hidden from JavaScript)
// XSS attack can't see or steal it

The browser still sends it with requests, but JavaScript can’t read it. This blocks XSS attacks.

secure=settings.session_cookie_secure - HTTPS only in production

1
2
3
@property
def session_cookie_secure(self) -> bool:
    return self.is_production

In development (HTTP), cookies work without Secure. In production (HTTPS), we require it.

Why? Cookies sent over HTTP are visible to anyone sniffing the network:

1
2
GET /stream/ HTTP/1.1
Cookie: stream_session=abc123...  # Attacker on WiFi sees this

With Secure flag, browsers refuse to send cookies over HTTP. Must use HTTPS.

samesite="lax" - CSRF protection

What’s CSRF (Cross-Site Request Forgery)?

An attacker tricks your browser into making requests to our API:

1
2
<!-- Attacker's site: evil.com -->
<img src="https://api.yourapp.com/api/sessions/1?delete=true">

Without SameSite:

  • Your browser sees: “Request to api.yourapp.com”
  • Your browser thinks: “I have a cookie for that domain!”
  • Your browser sends: Cookie header automatically
  • API receives authenticated request, deletes your session

With SameSite=Lax:

  • Browser checks: “Is this request from yourapp.com?”
  • Browser sees: “No, it’s from evil.com (cross-site)”
  • Browser refuses: Cookie not sent
  • API receives unauthenticated request, rejects with 401

Lax vs Strict?

  • SameSite=Strict - Never send cookie on cross-site requests (even clicking a link)
  • SameSite=Lax - Send cookie for safe methods (GET) when navigating, block on POST/DELETE

Lax is the sweet spot: protects against CSRF, allows legitimate navigation.

max_age=600 - 10-minute lifetime

Short-lived cookies limit damage if stolen. After 10 minutes:

  • Signature timestamp exceeds max_age
  • Backend rejects cookie: SignatureExpired
  • Frontend must call /auth/session again

Why 10 minutes?

  • Long enough for normal SSE connections
  • Short enough to limit stolen cookie abuse
  • Auto-renewed when frontend reconnects

Compare to JWT (24 hours): If attacker steals JWT, they have all day. If attacker steals cookie, they have 10 minutes.

The GET /auth/me Endpoint

This endpoint is simpler — just return user profile info.

Frontend uses it for:

1
2
3
4
5
6
7
const { data: user } = await fetch('/auth/me', {
  headers: { Authorization: `Bearer ${token}` }
})

// Display in UI
<Avatar src={user.avatar_url} />
<span>{user.name}</span>

Backend uses it to:

  • Auto-register user (first request after Auth0 signup)
  • Sync profile changes (user updated name in Google)
  • Provide database ID (frontend might need it)

Why return both id and auth0_id?

  • id - Database primary key (useful for debugging, linking to admin panel)
  • auth0_id - External ID (useful for debugging Auth0 issues, matching logs)

Most frontends only display name and email, but having both IDs helps debugging.

The Dual Identity Problem

Here’s the challenge: we have two user IDs to manage.

Auth0 ID (external):

1
"auth0|507f1f77bcf86cd799439011"

This comes from Auth0. It’s in every JWT. It’s how Auth0 identifies users across all applications using your tenant.

Database ID (internal):

1
42

This is our PostgreSQL primary key. It’s what foreign keys reference (sessions.user_id, messages.user_id). It’s auto-incrementing, unique per user in our database.

Why do we need both?

We can’t use Auth0 IDs as primary keys because:

  1. They’re strings - slower to index and join than integers
  2. They’re external - we don’t control the format (Auth0 might change it)
  3. They’re verbose - 30+ characters vs 4-8 bytes for an integer

We can’t use database IDs in JWTs because:

  1. Sequential - exposes user count (user 42 means ~42 users)
  2. Predictable - easy to enumerate all users
  3. Not portable - if we migrate databases, IDs change

The solution: Bridge pattern.

1
2
3
4
5
JWT contains: auth0_id
Database lookup: auth0_id → user.id
Use for queries: WHERE user_id = user.id

This is the get-or-create pattern.

User Service: Auto-Registration

Every user who logs in with Auth0 gets automatically registered in our database. No signup form. No email verification. Auth0 already handled that.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# backend/app/domain/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.repositories.user_repo import user_repo
from app.persistence.models import User


class UserService:
    """Business logic for user operations."""

    async def get_or_create_from_auth0_id(
        self,
        db: AsyncSession,
        auth0_id: str,
        email: str = "",
        name: str = "",
    ) -> User:
        """
        Get user by Auth0 ID, or create if first login.

        This is the bridge between Auth0 (external identity) and our database
        (internal identity). Called on every authenticated request.

        Args:
            auth0_id: From JWT 'sub' claim (e.g., "auth0|507f1f77bcf...")
            email: From JWT 'email' claim (optional)
            name: From JWT 'name' claim (optional)

        Returns:
            User object with database ID populated
        """
        # Try to find existing user
        user = await user_repo.get_by_auth0_id(db, auth0_id)

        if user:
            # User exists: check if profile changed in Auth0
            # (user updated email/name in Auth0 dashboard or social provider)
            updated = False

            if email and user.email != email:
                user.email = email
                updated = True

            if name and user.name != name:
                user.name = name
                updated = True

            if updated:
                # Persist changes to database
                await db.commit()
                await db.refresh(user)

            return user

        # User doesn't exist: first login, auto-register
        user = await user_repo.create(
            db,
            auth0_id=auth0_id,
            email=email,
            name=name,
        )
        return user


user_service = UserService()

What’s happening:

Scenario 1: Returning user

1
2
3
4
5
6
1. User logs in → Auth0 issues JWT with sub="auth0|123"
2. User calls API → Backend verifies JWT → extracts auth0_id
3. get_or_create_from_auth0_id("auth0|123") → finds existing user
4. Check if email/name changed in Auth0 → update if needed
5. Return user with id=42
6. Use user.id for all database queries

Scenario 2: First-time user

1
2
3
4
5
6
7
1. User logs in with Google → Auth0 creates account
2. Auth0 issues JWT with sub="google-oauth2|110223..."
3. User calls API → Backend verifies JWT
4. get_or_create_from_auth0_id("google-oauth2|110223...") → not found
5. Create new user record with auth0_id="google-oauth2|110223..."
6. Return user with id=43 (auto-incremented)
7. User is now registered without filling out a form

Why update email/name on every request?

Users can change their profile in Auth0 (or their social provider). If someone updates their name in Google, we want it reflected in our app. We sync on every authenticated request.

This is cheap:

  • Only updates if changed (database write only when necessary)
  • Already fetching user for authorization check (no extra query)
  • Auth0 already validated the email/name (we trust the JWT)

Why optional email/name?

Some OAuth providers don’t share email (privacy settings). Some don’t provide names. We handle missing data gracefully with empty strings.

User Repository: Database Operations

The repository layer handles raw database operations. Clean separation: service has business logic, repository has SQL.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# backend/app/persistence/repositories/user_repo.py
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import User


class UserRepository:
    """Database operations for users."""

    async def get_by_id(self, db: AsyncSession, user_id: int) -> Optional[User]:
        """
        Get user by database primary key.

        Used when we already have the internal ID (e.g., from a foreign key).
        Returns None if not found.
        """
        result = await db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

    async def get_by_auth0_id(
        self, db: AsyncSession, auth0_id: str
    ) -> Optional[User]:
        """
        Get user by Auth0 ID.

        This is the lookup that bridges external identity (Auth0) to internal
        identity (database ID). Called on every authenticated request.

        Returns None if user not found (first login).
        """
        result = await db.execute(select(User).where(User.auth0_id == auth0_id))
        return result.scalar_one_or_none()

    async def create(
        self, db: AsyncSession, auth0_id: str, email: str, name: str = ""
    ) -> User:
        """
        Create new user record.

        Called during auto-registration when user logs in for the first time.
        Database assigns the ID (auto-increment).
        """
        user = User(auth0_id=auth0_id, email=email, name=name)
        db.add(user)
        await db.commit()
        await db.refresh(user)  # Populates user.id from database
        return user


user_repo = UserRepository()

Why separate service and repository?

Service layer (user_service.py):

  • Business logic: “get or create” logic
  • Decides WHEN to create vs update
  • Handles profile syncing
  • Could send welcome emails, trigger analytics, etc.

Repository layer (user_repo.py):

  • Database operations: SQL queries
  • HOW to fetch/create/update
  • No business logic, just CRUD
  • Easily testable with mocks

Example of layering benefits:

If we want to add “send welcome email on first registration”:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Service layer (user_service.py)
async def get_or_create_from_auth0_id(self, ...):
    user = await user_repo.get_by_auth0_id(db, auth0_id)

    if not user:
        # NEW: First-time user logic
        user = await user_repo.create(db, auth0_id, email, name)
        await email_service.send_welcome_email(user.email)  # Business logic
        await analytics.track("user_registered", user.id)   # Business logic

    return user

Repository stays unchanged — it just does database operations. Business logic lives in the service.

This separation makes testing easier:

1
2
3
4
5
6
7
# Test service without hitting database
mock_repo.get_by_auth0_id.return_value = None
mock_repo.create.return_value = User(id=42, auth0_id="test")

# Test that welcome email sent
await user_service.get_or_create_from_auth0_id(db, "test", "a@b.com")
email_service.send_welcome_email.assert_called_once()

The flow in production:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
Request with JWT
FastAPI dependency: get_current_user_id()
    ↓ verifies JWT, extracts auth0_id
Endpoint: await user_service.get_or_create_from_auth0_id(auth0_id)
Service: await user_repo.get_by_auth0_id(auth0_id)
Repository: SELECT * FROM users WHERE auth0_id = $1
PostgreSQL: returns row or None
If None: Repository.create() → INSERT INTO users
Service returns User with database ID
Endpoint uses user.id for session queries

All of this happens on every authenticated request. It’s fast (indexed lookup on auth0_id), and ensures users are always registered and up-to-date.

Register the Router

1
2
3
4
5
6
# backend/app/main.py
from app.api import sessions, messages, auth

app.include_router(sessions.router)
app.include_router(messages.router)
app.include_router(auth.router)

Protecting API Endpoints

Now we need to protect the API endpoints we built in Parts 1 & 2. We’ll add JWT authentication to sessions and messages.

Session Endpoints

What we’re changing: Adding auth0_id: str = Depends(get_current_user_id) to each endpoint and using it to look up the user.

Complete updated file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# backend/app/api/sessions.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.dependencies import get_session
from app.core.auth import get_current_user_id
from app.domain.services.user_service import user_service
from app.domain.services.session_service import session_service
from app.domain.dtos import SessionDTO, SessionCreateDTO

router = APIRouter(prefix="/api/sessions", tags=["sessions"])


@router.post("/", response_model=SessionDTO, status_code=status.HTTP_201_CREATED)
async def create_session(
    data: SessionCreateDTO,
    db: AsyncSession = Depends(get_session),
    auth0_id: str = Depends(get_current_user_id),
):
    """
    Create new chat session.

    Requires: Authorization: Bearer <JWT>
    """
    # Get user from auth0_id
    user = await user_service.get_or_create_from_auth0_id(db, auth0_id)

    try:
        return await session_service.create_session(db, user.id, data.title)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))


@router.get("/", response_model=list[SessionDTO])
async def list_sessions(
    skip: int = 0,
    limit: int = 50,
    db: AsyncSession = Depends(get_session),
    auth0_id: str = Depends(get_current_user_id),
):
    """List user's sessions."""
    user = await user_service.get_or_create_from_auth0_id(db, auth0_id)
    return await session_service.list_user_sessions(db, user.id, skip, limit)


@router.get("/{session_id}", response_model=SessionDTO)
async def get_session(
    session_id: int,
    db: AsyncSession = Depends(get_session),
    auth0_id: str = Depends(get_current_user_id),
):
    """Get specific session."""
    user = await user_service.get_or_create_from_auth0_id(db, auth0_id)

    session = await session_service.get_session(db, session_id, user.id)
    if not session:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Session not found",
        )
    return session


@router.delete("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_session(
    session_id: int,
    db: AsyncSession = Depends(get_session),
    auth0_id: str = Depends(get_current_user_id),
):
    """Delete session."""
    user = await user_service.get_or_create_from_auth0_id(db, auth0_id)

    deleted = await session_service.delete_session(db, session_id, user.id)
    if not deleted:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Session not found",
        )

The key change: Every endpoint now has auth0_id: str = Depends(get_current_user_id). This:

  1. Requires Authorization: Bearer <JWT> header
  2. Verifies the JWT signature and claims
  3. Extracts the Auth0 user ID (sub claim)
  4. Passes it to the endpoint

We then use user_service.get_or_create_from_auth0_id() to:

  • Look up the user in our database by auth0_id
  • Auto-create them if first login
  • Use their database user.id for session operations

What stayed the same:

  • All the business logic (session_service calls)
  • DTOs and response models
  • Error handling
  • The actual session operations

We just added the authentication layer on top.

Message Endpoints

Same pattern — adding JWT auth to message operations.

Complete updated file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# backend/app/api/messages.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.dependencies import get_session
from app.core.auth import get_current_user_id  # NEW import
from app.domain.services.message_service import message_service
from app.domain.services.user_service import user_service  # NEW import
from app.domain.dtos import MessageDTO, MessageCreateDTO

router = APIRouter(prefix="/api/messages", tags=["messages"])


@router.post("/", response_model=MessageDTO, status_code=status.HTTP_201_CREATED)
async def create_message(
    data: MessageCreateDTO,
    session_id: int = Query(...),
    db: AsyncSession = Depends(get_session),
    auth0_id: str = Depends(get_current_user_id),  # NEW: JWT auth
):
    """
    Create message in session.

    Requires: Authorization: Bearer <JWT>
    """
    # NEW: Get user from Auth0 ID
    user = await user_service.get_or_create_from_auth0_id(db, auth0_id)

    try:
        return await message_service.create_message(db, session_id, user.id, data)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))


@router.get("/", response_model=list[MessageDTO])
async def list_messages(
    session_id: int = Query(...),
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_session),
    auth0_id: str = Depends(get_current_user_id),  # NEW: JWT auth
):
    """
    List messages in session.

    Requires: Authorization: Bearer <JWT>
    """
    # NEW: Get user from Auth0 ID
    user = await user_service.get_or_create_from_auth0_id(db, auth0_id)

    try:
        return await message_service.list_session_messages(
            db, session_id, user.id, skip, limit
        )
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))

Changes:

  • Added get_current_user_id import and dependency to both endpoints
  • Added user_service import
  • Each endpoint now requires JWT authentication
  • We look up the user by auth0_id before performing operations

What stayed the same:

  • Business logic in message_service
  • Permission checks (403 if user tries to access someone else’s session)
  • DTOs and response models
  • Query parameters and pagination

SSE Endpoint

For SSE, we use the cookie instead of JWT:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# backend/app/api/stream.py
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse

from app.core.auth import get_user_from_stream_cookie

router = APIRouter(prefix="/stream", tags=["streaming"])


@router.get("/")
async def stream_events(
    auth0_id: str = Depends(get_user_from_stream_cookie),
):
    """
    SSE endpoint for agent responses.

    Requires stream session cookie (call POST /auth/session first).
    """
    async def event_generator():
        yield f"data: {{\"type\": \"connected\", \"user\": \"{auth0_id}\"}}\n\n"

        # Agent streaming will go here in Part 4
        yield f"data: {{\"type\": \"message\", \"content\": \"Hello from SSE!\"}}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

Register the router:

1
2
3
4
# backend/app/main.py
from app.api import sessions, messages, auth, stream

app.include_router(stream.router)

Frontend Integration

Now the frontend. We’ll use Auth0’s React SDK for the cleanest integration.

Installing Dependencies

1
2
cd frontend
npm install @auth0/auth0-react

Configuring Auth0Provider

Wrap your app:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// frontend/src/main.tsx
import React from 'react'
import ReactDOM from 'react-dom/client'
import { Auth0Provider } from '@auth0/auth0-react'
import App from './App'
import './index.css'

const domain = import.meta.env.VITE_AUTH0_DOMAIN
const clientId = import.meta.env.VITE_AUTH0_CLIENT_ID
const audience = import.meta.env.VITE_AUTH0_AUDIENCE

ReactDOM.createRoot(document.getElementById('root')!).render(
  <React.StrictMode>
    <Auth0Provider
      domain={domain}
      clientId={clientId}
      authorizationParams={{
        redirect_uri: window.location.origin,
        audience: audience,
        scope: "openid profile email",
      }}
      cacheLocation="memory"
      useRefreshTokens={true}
    >
      <App />
    </Auth0Provider>
  </React.StrictMode>,
)

Config explained:

  • domain, clientId: From Auth0 dashboard
  • audience: Your API identifier
  • scope: Claims to include in JWT
  • cacheLocation="memory": Don’t persist tokens to localStorage (XSS risk)
  • useRefreshTokens={true}: Stay logged in with refresh tokens

API Client with JWT

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// frontend/src/api/client.ts
import { useAuth0 } from '@auth0/auth0-react'

const API_URL = import.meta.env.VITE_API_URL || 'http://localhost:8000'

export function useApiClient() {
  const { getAccessTokenSilently } = useAuth0()

  async function fetchWithAuth(endpoint: string, options: RequestInit = {}) {
    // Get JWT (refreshes automatically if expired)
    const token = await getAccessTokenSilently()

    const response = await fetch(`${API_URL}${endpoint}`, {
      ...options,
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${token}`,
        ...options.headers,
      },
      credentials: 'include',  // Send cookies (for SSE cookie)
    })

    if (!response.ok) {
      const error = await response.json().catch(() => ({ detail: 'Unknown error' }))
      throw new Error(error.detail || `HTTP ${response.status}`)
    }

    return response
  }

  return { fetchWithAuth }
}

Key points:

  • getAccessTokenSilently() fetches token from memory or refreshes if expired
  • We include token in Authorization: Bearer header
  • We include credentials: 'include' to send cookies
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// frontend/src/api/sse.ts
import { useAuth0 } from '@auth0/auth0-react'

const API_URL = import.meta.env.VITE_API_URL || 'http://localhost:8000'

export function useSSE() {
  const { getAccessTokenSilently } = useAuth0()

  async function openStream(onMessage: (event: MessageEvent) => void) {
    // Step 1: Get JWT and exchange for cookie
    const token = await getAccessTokenSilently()

    await fetch(`${API_URL}/auth/session`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${token}`,
      },
      credentials: 'include',  // Save the cookie
    })

    // Step 2: Open SSE (cookie sent automatically)
    const eventSource = new EventSource(`${API_URL}/stream/`, {
      withCredentials: true,
    })

    eventSource.onmessage = onMessage

    eventSource.onerror = async (error) => {
      console.error('SSE error:', error)
      eventSource.close()

      // Could implement auto-reconnect with fresh cookie here
    }

    return eventSource
  }

  return { openStream }
}

The flow:

  1. Get JWT from Auth0 (in memory)
  2. Call POST /auth/session with JWT -> backend sets cookie
  3. Open EventSource with withCredentials: true -> browser sends cookie
  4. Backend verifies cookie -> streams events

Login/Logout Components

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// frontend/src/components/Auth/LoginButton.tsx
import { useAuth0 } from '@auth0/auth0-react'

export function LoginButton() {
  const { loginWithRedirect } = useAuth0()

  return (
    <button
      onClick={() => loginWithRedirect()}
      className="px-4 py-2 bg-blue-600 text-white rounded"
    >
      Log In
    </button>
  )
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// frontend/src/components/Auth/LogoutButton.tsx
import { useAuth0 } from '@auth0/auth0-react'

export function LogoutButton() {
  const { logout } = useAuth0()

  return (
    <button
      onClick={() => logout({ logoutParams: { returnTo: window.location.origin } })}
      className="px-4 py-2 bg-gray-600 text-white rounded"
    >
      Log Out
    </button>
  )
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// frontend/src/components/Auth/AuthButton.tsx
import { useAuth0 } from '@auth0/auth0-react'
import { LoginButton } from './LoginButton'
import { LogoutButton } from './LogoutButton'

export function AuthButton() {
  const { isAuthenticated, isLoading, user } = useAuth0()

  if (isLoading) {
    return <div>Loading...</div>
  }

  if (!isAuthenticated) {
    return <LoginButton />
  }

  return (
    <div className="flex items-center gap-4">
      <span>Hello, {user?.name || user?.email}</span>
      <LogoutButton />
    </div>
  )
}

Using in Components

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// frontend/src/pages/Chat.tsx
import { useEffect, useState } from 'react'
import { useAuth0 } from '@auth0/auth0-react'
import { useApiClient } from '@/api/client'
import { useSSE } from '@/api/sse'

export function ChatPage() {
  const { isAuthenticated, isLoading } = useAuth0()
  const { fetchWithAuth } = useApiClient()
  const { openStream } = useSSE()
  const [sessions, setSessions] = useState([])
  const [sseMessage, setSSEMessage] = useState('')

  useEffect(() => {
    if (!isAuthenticated) return

    // Load sessions
    fetchWithAuth('/api/sessions/')
      .then(r => r.json())
      .then(setSessions)

    // Open SSE
    const eventSource = await openStream((event) => {
      const data = JSON.parse(event.data)
      setSSEMessage(`${data.type}: ${data.content || data.user}`)
    })

    return () => eventSource.close()
  }, [isAuthenticated])

  if (isLoading) return <div>Loading...</div>
  if (!isAuthenticated) return <div>Please log in</div>

  return (
    <div>
      <h1>Your Sessions</h1>
      <ul>
        {sessions.map(s => <li key={s.id}>{s.title}</li>)}
      </ul>
      <div>SSE: {sseMessage}</div>
    </div>
  )
}

Testing the Full Flow

Let’s verify everything works.

1. Start backend:

1
make dev

2. Test unauthenticated access (should fail):

1
2
curl http://localhost:8000/api/sessions/
# Expected: {"detail": "Not authenticated"}

3. Get a JWT from Auth0:

The easiest way is through the frontend. But for testing, you can use Auth0’s dashboard:

  • Go to your API settings
  • Click “Test” tab
  • Copy the test access token

4. Test with JWT:

1
2
3
4
5
6
7
8
9
TOKEN="your-jwt-from-auth0"

# Create session
curl -X POST http://localhost:8000/api/sessions/ \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"title": "Test Session"}'

# Should return: {"id": 1, "title": "Test Session", ...}

5. Exchange JWT for cookie:

1
2
3
4
5
6
7
curl -X POST http://localhost:8000/auth/session \
  -H "Authorization: Bearer $TOKEN" \
  -c cookies.txt

# Check cookie was set
cat cookies.txt
# Should show: stream_session <signed-value>

6. Test SSE with cookie:

1
2
3
4
5
6
7
curl http://localhost:8000/stream/ \
  -b cookies.txt \
  -N

# Should stream:
# data: {"type": "connected", "user": "auth0|..."}
# data: {"type": "message", "content": "Hello from SSE!"}

7. Frontend test:

Visit http://localhost:5173, click “Log In”, authenticate with Auth0, and you should see:

  • Sessions loading from API
  • SSE connection established
  • Messages streaming

What We’ve Built

Production-grade authentication with a clean hybrid approach:

Core Infrastructure:

  • Auth0 OAuth2/OIDC with PKCE flow (SPA pattern)
  • JWT verification with JWKS caching
  • Stateless signed cookies for SSE (no database lookups)
  • All API endpoints protected with Bearer tokens
  • SSE authenticated with cookies
  • Auto user registration on first login

Architecture:

  • 🎯 Right tool for the job (JWTs for API, cookies for SSE)
  • 🎯 Stateless (both JWT and cookie verification need no database)
  • 🎯 Standard patterns (Auth0 React SDK, Bearer tokens)
  • 🎯 Short-lived cookies (10 min) limit exposure
  • 🎯 Secure by default (HttpOnly, Secure, SameSite)

Security:

  • 🔒 PKCE prevents code interception in SPA
  • 🔒 RS256 asymmetric signing (Auth0’s private key, our public key)
  • 🔒 JWKS caching with auto-refresh
  • 🔒 Cookie signatures prevent tampering
  • 🔒 HttpOnly cookies safe from XSS
  • 🔒 Secure flag enforces HTTPS in production

What we avoided:

  • Rolling our own password auth
  • Tokens in URLs
  • Polling instead of SSE
  • WebSocket overhead
  • Database lookups for every auth check
  • Dual auth systems (one mechanism: verify signatures)

This is clean, minimal, production-ready.

Additional Resources

What’s Next

In Part 4: Agent Integration & Streaming, we build the core feature:

  • Integrating OpenAI Agents SDK
  • Building agents with tools
  • Streaming responses via SSE (using our auth)
  • Handling tool calls in real-time
  • Persisting conversations
  • Resuming from history

The auth we built today makes Part 4 straightforward—protected endpoints and authenticated streams are ready. We just plug in the agent logic.


Next: Part 4 - Agent Integration & Streaming


This is part of a series on building production-ready AI agent applications. All code is open source on GitHub.