Building a Production-Ready Agent Stack: Part 2 - Backend Core & Database
Our FastAPI server is running, but it’s just a health check endpoint talking to no one. Time to build the persistence layer - database models, repositories, domain services, and the REST API that will power our agent conversations.
Right now, our backend can tell you it’s alive (GET /health), but it can’t store a conversation or remember a user. Postgres is running in Docker, but we’re not using it. Let’s handle all that.
TLDR
You’ll build: async SQLAlchemy engine + Alembic, three ORM models, repositories, services, and REST endpoints with DTOs. You’ll learn: unit‑of‑work vs repo commits, avoiding N+1, 403 vs 404 in ownership checks, and why DTOs decouple DB from API. You’ll verify: run
make migrate, create a session in Swagger, insert a message, list sessions/messages.
Branch code: part-2-backend-core-database
By the end of this post, you’ll have:
- SQLAlchemy models for users, sessions, and messages with proper relationships
- Alembic migrations versioning your schema changes in git
- Repository pattern abstracting database access for testability
- Domain services enforcing business logic and authorization
- REST endpoints for full CRUD on sessions and messages
We’re building a three-layer architecture — API, Domain, Persistence — to keep concerns separated as complexity grows.
%%{init: {'theme':'base', 'themeVariables': { 'fontSize':'16px'}}}%%
graph TB
Client[Client Request]
API["API Layer<br/><small>app/api/</small><br/><small>HTTP routing, validation</small>"]
Domain["Domain Layer<br/><small>app/domain/</small><br/><small>Business logic</small>"]
Repo["Persistence Layer<br/><small>app/persistence/</small><br/><small>Database access</small>"]
DB[(PostgreSQL)]
Client --> API
API --> Domain
Domain --> Repo
Repo --> DB
style Client fill:#f0f4ff,stroke:#a5b4fc,stroke-width:2px
style API fill:#dbeafe,stroke:#93c5fd,stroke-width:2px
style Domain fill:#d1fae5,stroke:#6ee7b7,stroke-width:2px
style Repo fill:#e9d5ff,stroke:#c084fc,stroke-width:2px
style DB fill:#bfdbfe,stroke:#60a5fa,stroke-width:2px
The layers:
API Layer (
app/api/): HTTP routing, request validation, response serialization. Knows FastAPI, doesn’t know Postgres.Domain Layer (
app/domain/): Business logic, validation rules, authorization. Pure Python - no FastAPI, no SQLAlchemy. Testable without a server or database.Persistence Layer (
app/persistence/): Database access via ORM models and repositories. Knows Postgres, doesn’t know HTTP.
This separation means you can swap Postgres for MongoDB by rewriting only the persistence layer. Change from REST to GraphQL by rewriting only the API layer. Add a rule like “max 10 sessions per user” by editing only the domain layer. No cascading changes across the codebase.
Let’s get started!
Info
Do / Don’t
- DO: Services own auth, validation, and transactions; repositories own queries.
- DO: Keep handlers thin — extract params, call service, map exceptions to HTTP.
- DON’T: Put SQL in handlers, or commit inside repositories when workflows are multi‑step.
Setting Up SQLAlchemy with Async Support
TLDR
- Implement async engine + sessionmaker
- Expose
pool_sizeandmax_overflowviaSettings - Verify with
/healthDB ping
Skip Ahead
If you know:
- Async engine + sessionmaker setup
- Pool tuning via Settings
- Health check DB ping
We’re using SQLAlchemy 2.0 with async support via asyncpg.
Why async matters for agent applications:
Agent apps are I/O-heavy. When a user sends a message, the flow looks like this:
- Save message to database (20ms I/O)
- Call LLM API and stream response (5-30 seconds I/O)
- Save response to database (20ms I/O)
- Update usage metrics (20ms I/O)
With synchronous code, each long operation blocks its worker. With a single worker, one user’s 10-second LLM call makes all others wait; 10 concurrent users finish at ~10s, 20s, …, 100s. With multiple workers (e.g., 4), requests finish in waves (~10s, 20s, 30s), improving throughput but still wasting time while each worker idles on I/O.
With async, if the LLM call is awaited I/O, the worker’s event loop can serve other requests while waiting. Ten concurrent users on one async worker can all complete in ~10s (plus overhead), because time is spent waiting on the remote service rather than blocking the server.
OK enough talk. Let’s create the database configuration:
| |
Let me walk you through what’s happening here - it’s more interesting than it looks!
The create_async_engine() function creates a connection to Postgres, but it doesn’t just make one connection. It creates a connection pool, which is like a cache of ready-to-use database connections that your application reuses.
Why connection pooling matters:
Every time Python needs to talk to Postgres, establishing a fresh connection takes 50-100ms. That might not sound like much, but when you’re handling hundreds of requests per second, those milliseconds add up fast. Connection pooling solves this by maintaining a set of “warm” connections that are already authenticated and ready to go. It’s the difference between opening a new browser tab for each Google search versus keeping tabs open.
Here’s what each parameter does and why you should care:
echo=settings.is_development: When you’re developing locally, this logs every SQL query to your console. This is incredibly useful for debugging - you’ll immediately see if an endpoint is doing 10 queries when it should only do 1 (the dreaded N+1 query problem).
In production, we turn this off because nobody wants gigabytes of query logs eating disk space.
pool_size/max_overflow (from Settings): Start with ~10/20. If all 10 are busy, SQLAlchemy can create up to 20 temporary connections. Tune conservatively; overly large pools can starve Postgres.
Why these numbers? They’re a conservative starting point. You might think “more is better,” but each connection consumes memory on both your application server and Postgres. Start here, monitor in production, and adjust based on your traffic patterns.
pool_pre_ping=True: Before handing a connection to your code, SQLAlchemy pings the database to make sure it’s still alive. Without this, you might grab a connection that Postgres closed due to inactivity, leading to random “connection closed” errors that are absolutely maddening to debug (trust me on this).
This tiny ping (1-2ms) saves you from those errors. Worth it.
expire_on_commit=False: By default, SQLAlchemy expires all objects after a commit, meaning the next time you access a field like user.email, it makes another database query to refresh the data. This is useful if you’re doing complex, long-running transactions where data might change.
But for API responses where we immediately serialize and return? It’s unnecessary overhead. We’re telling SQLAlchemy “we trust ourselves - when we commit, we’re done with that transaction, no need to re-fetch everything.”
The async_sessionmaker creates a factory for database sessions. Each time you call it, you get a new session (a transaction context). Think of it as a workspace for one logical unit of work - like “create a user and their first session.”
Finally, get_db() is a generator function (notice the yield). It creates a session, hands it to your code via the yield, and then - here’s the important part - guarantees it gets closed in the finally block, even if your code crashes with an exception. This prevents connection leaks where you gradually run out of database connections until the whole system grinds to a halt.
Important note about the return type: We’re using AsyncGenerator[AsyncSession, None] instead of just AsyncSession because this function uses yield. This is a SQLAlchemy 2.0 requirement - generator functions need to be properly typed as generators, not as their yielded type.
Expose pool tuning in your settings so prod changes don’t require code edits:
Now let’s wire this into FastAPI’s dependency injection system:
| |
This creates a wrapper around get_db() that we’ll use in our endpoints. You might be wondering “why not just use get_db() directly?” Good question! This extra layer gives us a place to add cross-cutting concerns later - logging, transaction management, request-scoped caching, etc.
Now here’s where FastAPI’s dependency injection really shines. Let’s update our existing health check endpoint in main.py to test database connectivity.
Open backend/app/main.py and update it:
| |
Look at that db: AsyncSession = Depends(get_session) parameter we added. FastAPI sees this and automatically:
- Calls
get_session()before your endpoint runs - Passes the database session as the
dbparameter to your function - Ensures the session gets closed after the request completes (even if an exception occurs)
You never write db = create_session() or db.close(). FastAPI handles the entire lifecycle. This is dependency injection at its finest - your code simply declares what it needs, and the framework provides it. No manual session management, no leaked connections, no boilerplate.
Note about text(): SQLAlchemy 2.0 requires raw SQL strings to be wrapped in text(). This makes it explicit that you’re executing raw SQL (as opposed to using the query builder). It’s a small safety feature that prevents accidental SQL injection when someone mistakenly passes a string where a query object was expected.
Try it now:
You should see:
If Postgres is down, the database field will show an error message instead. This is your first working endpoint with database access!
Additional Resources
Alembic: Version Control for Your Database Schema
TLDR
- Autogenerate migrations from ORM changes
- Run via Make targets inside Docker
- Always review the generated diff before applying
Skip Ahead
If you know:
- Alembic init + async env config
- Autogenerate and review migrations
- Apply with
make migrate
Here’s a question: how do you deploy a database schema change to production? Do you manually run SQL scripts? Keep a folder of numbered SQL files? Hope you remember what you changed?
This is where Alembic comes in. It’s like git for your database schema. Every time you change an ORM model (add a column, create a table, add an index), Alembic generates a migration file that describes the change. These migrations are versioned, reversible, and stored in git alongside your code.
The magic of Alembic is that it auto-generates migrations by comparing your ORM models to the actual database. You add a field to your User model, run make migrate-create, and Alembic says “oh, you added an email column - here’s the SQL to add it.” You review the generated migration, and if it looks good, apply it with make migrate.
Let’s set it up:
This creates an alembic/ directory with configuration files and a versions/ folder where migrations live. Now we need to configure it for async SQLAlchemy.
Open alembic/env.py and replace the contents with this:
| |
This might look intimidating even though it mostly came from the default Alembic template (we convert to async), but here’s what matters:
Line 11-12: We import our ORM models’ Base metadata and our settings. This tells Alembic about our models.
Line 18: We override the database URL from our settings instead of hardcoding it in alembic.ini. This way, we can use different databases for dev, staging, and production.
Line 26: The target_metadata = Base.metadata line is crucial - this is how Alembic knows about your models. When you run make migrate-create, Alembic compares this metadata against the actual database to figure out what changed.
Line 43-53: The run_migrations_online() function uses an async engine instead of the default synchronous one. This matches our async SQLAlchemy setup. Without this, you’d get errors about mixing sync and async code.
The run_migrations_offline() function generates SQL without connecting to the database - useful for creating migration scripts you can review before running.
Now let’s add convenient commands to our Makefile so you don’t have to type long alembic commands. We’ll run these inside Docker to ensure we always talk to the Compose Postgres service:
| |
These three commands are all we need:
make migrate: Runs all pending migrations. This brings your database up to date with the latest schema. Run this after pulling new code from git, or when setting up a fresh development environment.
make migrate-create: Generates a new migration by comparing your ORM models to the database. It prompts you for a message (like a git commit message). Be descriptive-“add email to user” is better than “changes.”
make migrate-rollback: Undoes the last migration. Useful when you realize you made a mistake and want to try again.
Here’s your typical workflow:
- Edit your ORM models: Add a field, create a new model, whatever you need.
- Generate the migration: Run
make migrate-createand provide a descriptive message. - Review the generated file: Open
alembic/versions/<timestamp>_your_message.pyand check theupgrade()anddowngrade()functions. Make sure they do what you expect. - Apply the migration: Run
make migrateto update your database. - Commit to git: The migration file goes in version control alongside your code.
Tip
Run migrations inside Docker. This avoids host Postgres conflicts and ensures the commands connect to the Compose db service via its internal DNS. If you really need to run Alembic on the host, update backend/.env to point at the mapped host port (and ensure nothing else is bound to that port). Otherwise prefer docker.
Warning
Always review auto-generated migrations before applying them. Alembic is smart, but it’s not perfect. Here are common gotchas:
- Column renames: Alembic sees these as drop + add, which loses all data in that column. You need to manually edit the migration to use
op.alter_column()instead. - Data migrations: Adding a new non-nullable column requires a two-step migration (add as nullable, backfill data, then make it non-nullable).
- Index names: Alembic sometimes generates different index names than you expect, leading to duplicate indexes.
Always check the upgrade() and downgrade() functions. If something looks wrong, edit it before running make migrate. Your future self (and your teammates) will thank you.
Building the Domain Models
TLDR
- Three ORM models:
User,Session,Message - Timezone-aware timestamps generated by DB
- Indexes for common queries (user sessions by
updated_at)
Skip Ahead
If you know:
- Define ORM models and relationships
- TZ-aware timestamps, constraints, indexes
- Minimal structure: Users → Sessions → Messages
Now for the fun part—defining what our database actually stores. Think about how a chat application works: You have users who create conversations (we’ll call them sessions), and each conversation contains messages going back and forth.
That’s exactly what we’re modeling here:
- User: Represents a person using your agent app (we’ll link this to Auth0 in Part 3)
- Session: A conversation thread with the agent (like a ChatGPT conversation)
- Message: Individual messages within a session (both what the user says and what the agent responds)
This is the minimal viable structure for a stateful agent application. Users own Sessions, Sessions contain Messages. It’s a simple hierarchy, but it scales beautifully-you could have millions of users, each with hundreds of sessions, each session with thousands of messages.
Let’s define these models using SQLAlchemy’s ORM:
| |
Let me walk you through the design decisions in this schema - each one solves a real problem I’ve encountered in production:
User model:
auth0_id: This is how we link database users to Auth0 accounts. When someone logs in with Auth0, we get an ID likeauth0|507f1f77bcf86cd799439011. We store that here, never storing passwords ourselves. Security win.email: We keep a copy of the email for convenience (displaying in the UI, sending notifications). It’s also indexed for fast lookups.created_at: Always useful for analytics (“how many users signed up this week?”) and debugging (“when did this user join?”).
Session model:
user_idforeign key: Links sessions to users. Theondelete="CASCADE"means when you delete a user, all their sessions automatically disappear. No orphaned data.title: A human-readable name for the conversation. Usually auto-generated from the first message (“Help me debug Python code”) but users can rename it.updated_at: Timezone-aware and server-generated; updates viaonupdate=func.now()whenever a row changes. Powers the “most recently active conversations” list in your UI.cascade="all, delete-orphan": Delete a session -> automatically delete all its messages. Postgres enforces this, so you can’t accidentally orphan messages.Index("idx_user_updated"): Composite index on(user_id, updated_at). Optimized for “list sessions for user 123 ordered by last activity” even at high scale.
Message model:
role: Can be “user”, “assistant”, “tool_call”, or “system”. This schema goes beyond the basic OpenAI convention by explicitly tracking tool calls as separate messages. When the agent calls a tool, we save a message withrole="tool_call"so the UI can show “Agent used search_database tool” as part of the conversation flow. In the API layer, we type this as anEnumfor safety.content: The actual message text. We useText(unlimited length) instead ofString(limited) because agent responses can be looong. For tool_call messages, this contains a user-friendly summary like “Called search_database”.tool_name: Only populated whenrole="tool_call". This stores which tool was executed (e.g., “search_database”, “web_fetch”). The UI uses this to render tool-specific icons and styling.trace_id: Links to the entire agent run in your tracing system (OpenAI API or Phoenix Arize). All messages in the same agent execution share the same trace_id. Click on any message -> see the full execution trace including all tool calls, LLM requests, and timing data. Indexed for fast lookups.span_id: Links to the specific operation within the trace. For tool_call messages, this is the span ID for that specific tool execution. For assistant messages, this might be the LLM generation span. This lets you jump directly to the exact operation in your trace viewer instead of searching through the entire trace. Indexed for fast lookups.tokens: How many tokens this message consumed. The OpenAI API returns this for each run, and we store it for billing and rate limiting (Part 6). Super useful for “you’ve used 45,000 tokens this month” displays.Index("idx_session_created"): Fast chronological message retrieval. When you open a session, you want messages in order, fast.
Relationships:
Notice the relationship() declarations like sessions: Mapped[list["Session"]] = relationship(...). These let you write user.sessions to get all of a user’s sessions without writing SQL. SQLAlchemy handles the join automatically. It’s magical in development, but be careful-loading relationships can cause N+1 query problems if you’re not mindful (we’ll address this with DTOs).
Note
We’re using SQLAlchemy 2.0’s Mapped[] syntax instead of the older Column() approach. This gives us better type inference - mypy can tell you if you’re accessing a field that doesn’t exist at compile time, not in the middle of the night when your production app crashes. If you see code online using Column(Integer, ...), that’s the old style. This is the new hotness.
Real-Time Tool Execution Display
Here’s a critical design decision for agent applications: How do you show tool execution in the UI?
Users expect to see what the agent is doing in real-time - “Searching database…” while it’s happening, then “search_database completed” when it’s done. And when they scroll through chat history later, they should see “Agent used search_database” as part of the conversation flow.
This gets tricky because we’re using external tracing platforms (OpenAI API trace viewer or Phoenix Arize) for detailed debugging. We don’t want to duplicate all that data in our application database - that’s expensive and redundant. But we do need enough information to show tool execution in the user-facing UI.
Here’s how we solve this with a two-tier architecture:
Tier 1: Real-time streaming (SSE)
While the agent is running, we stream events to the frontend using Server-Sent Events (remember Part 1?). These events are ephemeral - they exist only during the request, never stored in the database:
sequenceDiagram
participant UI
participant Backend
participant Agent
UI->>Backend: Open SSE connection
Backend->>Agent: start()
Agent-->>Backend: tool_call_started
Backend-->>UI: event: tool_call_started
Agent-->>Backend: tool_call_completed
Backend-->>UI: event: tool_call_completed
Agent-->>Backend: assistant_message
Backend-->>UI: event: assistant_message
Backend-->>UI: event: done
| |
The frontend receives these events as they happen and updates the UI in real-time:
- When
tool_call_startedarrives -> Show “Calling search_database…” with a spinner - When
tool_call_completedarrives -> Update to “search_database completed” - When
message_createdarrives -> Show the final assistant response
This gives users live feedback without any database writes. Fast, cheap, and responsive.
Tier 2: Persistent storage (Messages table)
After the agent finishes, we save the conversation to the database for historical display. This is where the Message model’s role="tool_call" support comes in:
| |
When rendering chat history, the UI shows:
- User’s question
- “search_database ran” (grayed out, maybe collapsible)
- Agent’s answer
The tool_call message is a lightweight placeholder. It says “the agent used this tool” without storing the tool’s arguments, raw output, or execution details. If you need to debug what went wrong, you have two levels of detail:
Look for the trace_id -> See the entire agent run in your tracing platform:
- All tool calls in sequence
- All LLM requests and responses
- Total execution time
- Overall token usage
- High-level flow visualization
Look for the span_id -> Jump directly to the specific operation (e.g., the search_database tool call):
- Exact tool arguments:
{"query": "SELECT * FROM documents", "limit": 100} - Raw tool output: The full JSON response
- Execution time: 247ms
- Error logs if it failed
- Retry attempts
- Parent-child span relationships
- Exact tool arguments:
Why this architecture works:
Division of responsibilities:
- Application database: User-facing conversation flow for the UI (what users see)
- Tracing platform: Developer-facing debugging details (what engineers need)
What we DON’T store in our DB:
- Tool call arguments (can be huge, contain sensitive data, change frequently)
- Raw tool outputs (stored in tracing platform)
- Execution timing (tracing platform)
- Error stack traces (tracing platform)
What we DO store:
- That a tool was called (for UI display)
- Tool name (to show “search_database ran”)
- User-friendly summary in
contentfield - Links to tracing platform via
trace_idandspan_id - Chronological ordering in conversation
This keeps your application database lean while giving users the transparency they expect. When a user says “the agent failed,” you can:
- Look at their conversation in the app -> See which tool call happened
- Copy the
trace_id-> View the entire agent run in your tracing platform - Or copy the
span_idfrom the specific tool_call -> Jump directly to that tool’s execution - See the full execution details -> Debug the actual issue
Implementation preview:
Here’s what the agent service will look like when we integrate OpenAI Agents SDK in Part 4:
| |
This pattern gives us:
- Real-time UI updates during execution (users see progress)
- Chat history shows “tool ran” after completion (transparent conversation flow)
- Minimal database storage (no duplication with tracing platform)
- Full debugging available externally (engineers can troubleshoot)
- Privacy and security (sensitive tool arguments never stored in app DB)
The key insight here is that tool_call messages are just another message type in the conversation flow - lightweight placeholders that enhance the user experience without creating a database bloat problem.
In Part 4, when we integrate the OpenAI Agents SDK, you’ll see this architecture in action. For now, the important takeaway is that our Message model is designed to support this dual-tier approach from day one.
Great! We’ve defined our ORM models. Now let’s create and apply the initial migration to set up the database tables.
Tip
Running migrations inside Docker: All migration commands execute inside the backend container using docker compose exec backend. This ensures the commands can reach the database via Docker’s internal networking (db:5432). The alembic/ directory is mounted as a volume, so migrations you create inside the container appear immediately on your host machine.
First, make sure your services are running:
| |
Now create and apply the migration:
Verify the tables were created:
| |
You should see:
Troubleshooting:
- “No ‘script_location’ key found in configuration”: The backend container is missing Alembic files. Rebuild with
docker compose -f infra/docker-compose.yml up --build -d. - “No such service: backend”: Services aren’t running. Start them with
make devfirst. - “Could not translate host name ‘db’ to address”: You’re trying to run migrations on your host machine instead of inside Docker. Use
make migrateinstead of runningalembicdirectly.
Additional Resources
The Repository Pattern: Clean Data Access
Alright, we have ORM models that map Python classes to database tables. But here’s the thing - you don’t want to scatter SQLAlchemy queries throughout your codebase like confetti at a parade. That way lies madness.
Let me show you what I mean. Imagine you’re building endpoints, and every time you need to find a user by email, you write:
This seems fine at first. But then you write this in 10 different files. Then someone says “we should cache user lookups,” and you’re grepping through the codebase hunting for every single query. Or you decide to add logging when users are accessed. Or you want to switch from Postgres to MongoDB (God help you). Suddenly you’re touching 20+ files for one change.
TL;DR
- Repositories own queries; services own transactions
- Keep handlers thin; don’t put SQL in endpoints
- One import point per model (easy to mock)
Skip Ahead
If you know:
- Repositories encapsulate all queries
- Handlers stay thin and HTTP-only
- Services coordinate business logic
Enter the Repository pattern.
A repository is a class that encapsulates all database operations for a specific model. It’s the single source of truth for “how do I get users from the database?” Instead of repeating queries everywhere, you write:
| |
That’s it. One line. The repository handles the query. If you need to add caching, logging, or change the implementation entirely, you change it in one place - the repository.
The benefits compound fast:
- Testability: Mock
user_repoin unit tests without touching the database. No test database setup, no fixtures, just pure business logic testing. - Consistency: All User queries live in one place. No more “wait, which endpoint uses the optimized query and which one doesn’t?”
- Maintainability: Change the query logic once, not in 20 files. Future you will thank present you.
- Swappable: Move from Postgres to MongoDB? Rewrite the repository. Your business logic doesn’t change.
Let’s build the UserRepository:
| |
Look at the structure here - it’s beautifully simple:
Every method takes db: AsyncSession as the first parameter: This is dependency injection in action. The repository doesn’t create its own database connection; it uses whatever session you pass in. This means the repository works with your request-scoped session from FastAPI, your test database session, or even a transaction you’re managing manually. Flexible and testable.
Methods return domain objects (User), not database rows: When you call get_by_id(), you get a User object with all its fields and relationships. Not a tuple, not a dictionary, but a proper Python object with type hints. Your IDE can autocomplete fields, mypy can catch typos.
Single results use scalar_one_or_none(), lists use scalars().all(): This is SQLAlchemy’s way of saying “give me one result (or None)” versus “give me all results as a list.” The scalar_one() variant raises an exception if not found, which is useful when you expect a result to exist. The _or_none() variant returns None, perfect for “optional” lookups.
Pagination is built-in: Every list method has skip and limit parameters. This prevents accidentally loading 10 million rows into memory when someone asks for “all users.” Start with sensible defaults (usually 50-100), and let the caller override if needed.
The singleton pattern: At the bottom, we create user_repo = UserRepository() and import it everywhere. This isn’t a true singleton (Python allows multiple instances), but by convention we use one shared instance. It’s stateless (no instance variables), so there’s no benefit to creating multiple copies.
Why singletons? It makes testing dead simple. Instead of mock.patch('app.persistence.repositories.user_repo.UserRepository'), you can just mock.patch('app.persistence.repositories.user_repo.user_repo') to replace the whole object. Cleaner tests. We will see this later on when we write unit tests.
Now let’s build SessionRepository following the same pattern:
| |
The SessionRepository adds a few interesting methods:
update_title(): Sessions can be renamed. Users often start a conversation with “Help me debug code” but realize it’s actually about “Python async troubleshooting” after a few messages. This method handles that update.
delete(): Returns True if deletion succeeded, False if the session didn’t exist. This lets the caller know whether they deleted something or tried to delete a ghost. Useful for “Session not found” versus “Successfully deleted” responses.
list_by_user() sorts by updated_at: Notice we’re using desc(Session.updated_at) to show the most recently active sessions first. This is what users expect - your most recent conversations at the top. The idx_user_updated index makes this query fast even with thousands of sessions.
Now let’s add MessageRepository:
| |
The MessageRepository is the simplest of the three because messages are mostly immutable - you create them and read them, but rarely update or delete individual messages (you delete the whole session instead).
list_by_session() sorts chronologically: Messages need to be in order - user message, then agent response, then user message, etc. We sort by created_at ascending (oldest first) to maintain the conversation flow.
count_by_session(): Useful for displaying “152 messages” in the UI without loading all 152 messages. We use func.count() which executes as a SQL COUNT(*) query - fast even with thousands of messages.
Tip
90‑second lab: rename + verify order
- Create two sessions; add a message to the older one.
- Call
update_title()on the older session. - List sessions: it should now sort by
updated_at(recently changed first).
Putting it all together:
Now instead of writing raw SQLAlchemy queries scattered across 20 files, we have three clean repositories. Here’s what using them looks like:
| |
Three lines. Three named operations. Zero SQL. If you need to add Redis caching tomorrow, you change the repository methods. If you need to log every database access, you add it to the repository. If you need to switch to a different database… you get the idea.
This is the power of repositories - clean, named operations instead of anonymous SQLAlchemy queries scattered everywhere. Your future self will thank you when it’s time to refactor.
Domain Services: Where Business Logic Lives
Skip Ahead
If you know:
- Keep handlers thin (parse → service → map errors)
- Services own validation and authorization
- Repositories return data; no HTTP logic
Okay, we’ve got repositories handling database access. But here’s the million-dollar question: where does your actual business logic go?
Not in your API handlers. Those should be thin HTTP routers that map requests to operations.
Not in your repositories. Those are just query builders - they shouldn’t know about business rules.
You need a domain services layer. This is where the real meat of your application lives.
The Three-Layer Architecture
Let me paint you a picture. Imagine your backend as a sandwich:
- Top layer (API handlers): The bread. Thin, handles HTTP concerns (routing, status codes, JSON serialization). Knows about FastAPI and HTTP.
- Middle layer (domain services): The filling. Thick, contains business logic, validation, authorization, orchestration. Knows nothing about HTTP or databases.
- Bottom layer (repositories): The other slice of bread. Thin, handles database queries. Knows about SQLAlchemy.
When a request comes in, it flows through all three layers:
| |
The response flows back up:
| |
Why three layers instead of just putting everything in the API handler?
I’ve seen what happens when you don’t. Here’s a real story:
I once worked on a codebase where every API endpoint looked like this:
| |
Seems reasonable, right? But this approach led to:
- Test nightmare: To test the business rule “titles can’t be empty,” we had to start FastAPI, make HTTP requests, and connect to a test database. Tests were slow and brittle.
- Duplication everywhere: The “title validation” logic appeared in 3 different endpoints. When requirements changed (“titles must be unique per user”), we had to update all 3 places.
- Can’t reuse: When we added a CLI tool to create sessions for testing, we couldn’t reuse any code. Had to duplicate the validation logic.
- Tight coupling: Changing from Postgres to MongoDB required rewriting every single endpoint because the SQL queries were embedded directly in the handlers.
The three-layer architecture fixes all of this.
What Goes in a Domain Service?
A domain service encapsulates all operations for a specific domain entity (like “Session” or “Message”). Think of it as the single source of truth for “how do I work with sessions?”
Here’s what belongs in a domain service:
- Validation rules: “Session titles must be 1-500 characters”
- Authorization checks: “Users can only access their own sessions”
- Orchestration: “When fetching a session, also count its messages”
- Business policies: “Users can have max 10 active sessions”
- Complex workflows: “Creating a session also creates a welcome message”
Here’s what does NOT belong:
- HTTP stuff (status codes, headers, cookies)
- Database stuff (SQL queries, transactions)
- Framework-specific code (FastAPI dependencies, decorators)
The domain service should be framework-agnostic. You should be able to call it from FastAPI, a CLI tool, a background job, or a test — without changing a single line.
Building SessionService
TLDR
- Services enforce validation and authorization
- Services own transactions (
async with db.begin()) - Repos do
add/flush/refresh; nocommit
Let’s build our SessionService step by step. I’ll explain each method as we go:
| |
Let’s break down what each method is doing and why:
create_session() - Validation before persistence:
- We validate the title before hitting the database. Why? Because validation is a business rule, not a database concern.
- We could add more rules here: “users can only have 10 active sessions,” “titles must be unique per user,” etc.
- The repository doesn’t know or care about these rules — it just saves data.
get_session() - Authorization and orchestration:
- Authorization: Check if the session belongs to the user. This prevents user A from accessing user B’s sessions.
- Orchestration: Fetch the session AND count its messages. The API client needs both pieces of data, so we fetch them together.
- Notice we’re raising
PermissionError, not returning an HTTP 403. The service doesn’t know about HTTP—that’s the API layer’s job.
list_user_sessions() - Multi-repository orchestration (no N+1):
- We call a single repo method that returns sessions with message counts to avoid N+1 queries.
- This is orchestration — coordinating repositories while keeping SQL details inside repos.
delete_session() - Authorization before deletion:
- Same authorization check as
get_session(). We’re enforcing the rule “users can only delete their own data.” - If we later add “admins can delete any session,” we add that logic here, not in 5 different places.
Common Questions About This Approach
Warning
Pitfall: 403 vs 404
Q: Why not filter by user_id in the repository?
You might wonder: “Why fetch the session first, then check ownership? Why not just get_by_id_and_user()?”
Here’s why the current approach is better:
With this approach, you can’t distinguish between:
- “Session #123 doesn’t exist” → should return HTTP 404
- “Session #123 exists but belongs to user #456” → should return HTTP 403 Forbidden
Our approach keeps authorization explicit and centralized:
This also makes it easy to extend later:
Separation of concerns:
- Repository: “How do I fetch this data?” (data access)
- Service: “Who can access this data?” (authorization)
- API: “What HTTP status code?” (protocol)
Q: Why import AsyncSession if services don’t use the database?
Sharp eye! You noticed we import from sqlalchemy.ext.asyncio import AsyncSession.
Here’s what’s happening: The service accepts AsyncSession as a dependency and owns transactions, while repositories focus on data access:
The service doesn’t write SQL or import ORM models.
The service does:
- Accept
AsyncSessionfor type hints (helps IDEs and type checkers) - Own commit/rollback boundaries via
async with db.begin() - Delegate queries to repositories (which do
add/flush/refresh, notcommit)
This is pragmatic dependency injection. We could define an abstract DatabaseInterface protocol, but that adds complexity for minimal benefit—we’re not planning to swap out SQLAlchemy.
Why a singleton instance at the bottom?
| |
This creates a single instance that’s imported everywhere. Services are stateless (they don’t store data between calls), so one instance is enough. This makes mocking easier in tests—you can patch session_service globally.
Building MessageService
Now let’s create the service for message operations. It follows the same pattern as SessionService but focuses on message-specific business logic:
| |
Key differences from SessionService:
Cross-entity authorization:
create_message()checks that the user owns the session before allowing message creation. We’re enforcing “you can only add messages to your own sessions.”Role validation: Messages have a
rolefield (user, assistant, tool_call, system). We validate it’s one of the allowed values.List comprehension:
list_session_messages()uses a list comprehension to convert all messages to DTOs in one line. Same pattern, cleaner syntax.
Both services follow the same principles:
- Authorization checks (who can do this?)
- Validation (is the data valid?)
- Orchestration (coordinate repositories)
- Return DTOs (not ORM models)
Note
Gotchas we just avoided
- 403 vs 404: fetch then authorize in services; don’t hide forbidden with “not found”.
- DTOs decouple DB from API; you can rename DB fields without breaking clients.
- Handler bloat: keep HTTP code thin; move rules + transactions into services.
The DTO Layer: Why We Don’t Return ORM Models
You’ll notice the service methods return SessionDTO, not the raw Session ORM model. This is crucial. Let me explain why with a concrete example.
The problem with returning ORM models directly:
Imagine you return a SQLAlchemy Session object directly to FastAPI:
This seems fine until:
Accidental data leaks: Your ORM model has internal fields (
_sa_instance_state, lazy-loaded relationships). FastAPI tries to serialize everything, potentially exposing data you didn’t intend to send.Breaking changes: You add a new column to the
sessionstable. Suddenly your API returns extra data. Clients that parse the response strictly might break.No validation: The API can’t validate that the data is correct before sending it. If a database migration leaves
updated_atNULL, you send NULL to the client.Circular references: ORM models have relationships. A Session has Messages, each Message has a Session. Try serializing that to JSON and watch it explode with circular reference errors.
Tight coupling: Your API shape is now tied to your database schema. Want to rename
user_idtoowner_idin the API without a database migration? Can’t do it.
DTOs solve all of this:
DTOs (Data Transfer Objects) are Pydantic models that define the exact shape of your API responses. They’re the contract between your backend and clients.
| |
What DTOs give us:
Explicit API contract:
SessionDTOdefines exactly what clients receive. It’s documentation as code.Computed fields: Notice
message_countinSessionDTO. This field doesn’t exist in the database — we compute it in the service layer. DTOs let us enrich responses without changing the database schema.Validation on input:
SessionCreateDTOvalidates that titles are 1-500 characters. FastAPI automatically returns HTTP 422 with error details if validation fails. No manual checking needed.Type safety: TypeScript clients can generate types from your OpenAPI spec.
SessionDTObecomes a TypeScript interface automatically.Separation of concerns:
- ORM models = database schema
- DTOs = API schema
- They can evolve independently
Small but important: structural validation (lengths, patterns) belongs in DTOs and returns HTTP 422 from FastAPI automatically; business rules (ownership, quotas, uniqueness by policy) belong in services and should raise ValueError/PermissionError, which handlers map to HTTP 400/403.
Example of independence:
Imagine you rename the sessions table column from title to session_name in the database. You update the ORM model:
But you keep the DTO the same:
Then you adjust the mapping in from_orm():
API clients don’t break. They still receive title in the JSON response. You’ve refactored the database without causing a breaking change.
This is the power of the DTO layer — it decouples your API from your database schema.
REST API Endpoints: Bringing It All Together
TLDR
- Thin handlers: parse → call service → map errors
- Services return DTOs; no ORM models in responses
- Verify via Swagger and
curl
Skip Ahead
If you know:
- Thin handlers with clear error mapping
- Services return DTOs, not ORM models
- Verify endpoints via Swagger/curl
We’ve built the foundation — repositories for data access, services for business logic, DTOs for API contracts. Now let’s wire it all together with FastAPI endpoints.
The Thin Handler Pattern
API endpoints should be thin. Really thin. Like 5-10 lines thin.
Their only job is to:
- Extract data from the HTTP request (path params, query params, request body)
- Call a service method
- Handle exceptions and convert them to HTTP status codes
- Return the response
They should NOT:
- Contain business logic
- Make database queries
- Validate business rules
All of that lives in the service layer. The endpoint is just HTTP plumbing.
Let’s see this pattern in action with our session endpoints:
| |
Let’s break down what’s happening in each endpoint:
1. create_session() - The creation flow:
sequenceDiagram
participant Client
participant FastAPI
participant Endpoint
participant Service
participant Repository
participant Database
Client->>FastAPI: POST /api/sessions<br/>{"title": "My chat"}
FastAPI->>FastAPI: Validate SessionCreateDTO<br/>(title length 1-500)
FastAPI->>Endpoint: create_session(data)
Endpoint->>Service: create_session(db, user_id, title)
Service->>Service: Validate title<br/>(not empty, not too long)
Service->>Repository: create(db, user_id, title)
Repository->>Database: INSERT INTO sessions...
Database-->>Repository: Session(id=1, ...)
Repository-->>Service: Session ORM model
Service->>Service: Convert to SessionDTO
Service-->>Endpoint: SessionDTO
Endpoint-->>FastAPI: SessionDTO
FastAPI->>FastAPI: Serialize to JSON
FastAPI-->>Client: HTTP 201<br/>{"id": 1, "user_id": 1, "title": "My chat", ...}
Get / Delete flows (at a glance):
sequenceDiagram
participant Client
participant API as Endpoint
participant Service
participant Repo
participant DB
Client->>API: GET /api/sessions/{id}
API->>Service: get_session(db, id, user)
Service->>Repo: get_by_id(db, id)
Repo->>DB: SELECT ... FROM sessions WHERE id = ?
DB-->>Repo: Session or NULL
Service-->>API: SessionDTO or None/PermissionError
API-->>Client: 200/404/403
Client->>API: DELETE /api/sessions/{id}
API->>Service: delete_session(db, id, user)
Service->>Repo: get_by_id(db, id)
Service->>Repo: delete(db, id) note over Service,Repo: within db.begin()
API-->>Client: 204/404/403
2. list_sessions() - Query parameters:
FastAPI automatically extracts skip and limit from the URL query string. Request to /api/sessions?skip=10&limit=20 becomes function arguments skip=10, limit=20.
No error handling needed here because the service method doesn’t raise exceptions — it just returns a list (which might be empty).
3. get_session() - Error handling:
The service raises PermissionError if the session belongs to another user. The endpoint catches this and converts it to HTTP 403 Forbidden. This is the beauty of the layered architecture:
- Service layer: Raises Python exceptions (knows nothing about HTTP)
- API layer: Catches exceptions, maps to HTTP status codes
4. delete_session() - Response with no body:
Returns HTTP 204 No Content on success. Notice there’s no return statement — FastAPI automatically returns 204 because we specified status_code=status.HTTP_204_NO_CONTENT in the decorator.
Why This Pattern Works
Notice how clean these endpoints are. Each one is 5-15 lines. They’re almost boring — just plumbing.
All the interesting stuff happens in the service layer:
- Validation logic lives in
session_service.create_session() - Authorization lives in
session_service.get_session() - Orchestration lives in
session_service.list_user_sessions()
If you want to test the business rule “users can only delete their own sessions,” you don’t write an integration test that starts FastAPI and makes HTTP requests. You write a unit test that calls session_service.delete_session() directly. Much faster.
The hardcoded user_id = 1:
You’ll see this in all endpoints right now. In Part 3, when we add Auth0 authentication, we’ll replace this with a dependency that extracts the user from the JWT token:
| |
For now, we’re hardcoding it so we can build and test the endpoints without authentication blocking us.
Message endpoints follow the exact same pattern — I won’t bore you with detailed explanations again. Notice the simplicity:
| |
Same pattern: thin handlers, service calls, exception mapping. Boring is good. Boring is maintainable.
Wiring Everything Together
Finally, we register the routers in our main FastAPI app:
| |
What’s happening here:
CORS middleware: Allows our React frontend (running on
localhost:5173) to make requests to the backend (localhost:8000). Without this, browsers block the requests due to same-origin policy.Router inclusion:
app.include_router()registers all the routes from each router. FastAPI combines them into a single API with automatic OpenAPI docs.Enhanced health check: Not just “ok” — it tests database connectivity by executing
SELECT 1. Returns environment info and database status. This is what monitoring tools (like Kubernetes health probes) will ping to verify the service is actually working, not just running.
Note
Same-origin policy is a browser security feature that blocks web pages from making requests to a different domain than the one that served the web page. CORS (Cross-Origin Resource Sharing) is a mechanism that allows servers to specify who can access their resources. In development, our frontend and backend run on different ports, so we need CORS to allow communication between them.
Testing the API
Our backend is now ready to test! Make sure your services are running:
| |
First, create a test user (remember, we’re hardcoding user_id = 1 in the endpoints for now):
You should see:
| |
Now create a session:
Response:
List all sessions:
| |
Create a message for that session:
Performance & Scale Notes
Skip Ahead
If you know:
- Eager loading to avoid N+1
- Pool sizing tuned via settings
- SSE proxy/server timeouts for production
- Eager loading: when rendering a full conversation, consider
selectinload(Session.messages)to avoid N+1 when fetching recent windows. - Pool sizing: keep
pool_size/max_overflowconfigurable; too‑large pools can starve Postgres. Monitor and tune. - SSE production: set
Cache-Control: no-cache,X-Accel-Buffering: nofor Nginx, and bumpproxy_read_timeoutto keep event streams alive.
Optional: Seed Dev Data
| |
Run inside the container:
| |
Or visit http://localhost:8000/docs to see the interactive API documentation that FastAPI generates automatically. You can test all endpoints right in the browser.
Warning
Why create a user manually? We hardcoded user_id = 1 in the endpoints because we don’t have authentication yet. In Part 3, we’ll add Auth0, and users will be created automatically when they sign up. For now, this manual step lets us test the full flow without authentication blocking us.
Additional Resources
What We’ve Built
Let’s take stock. We’ve gone from “just a health check” to a fully functional backend with proper separation of concerns.
Database Layer:
- Async SQLAlchemy engine with connection pooling
- Three ORM models (User, Session, Message) with relationships
- Alembic migrations tracking schema changes in git
- Indexes for fast queries (
idx_user_updated,idx_session_created) - Cascading deletes (delete user -> delete sessions -> delete messages)
Repository Layer:
- UserRepository with methods for finding users by ID, email, Auth0 ID
- SessionRepository for creating, listing, updating, and deleting sessions
- MessageRepository for storing and retrieving conversation messages
- Clean interface abstracting database details
- Mockable for unit testing
Domain Layer:
- SessionService enforcing business rules (title validation, authorization)
- MessageService (coming in Part 4 when we integrate agents)
- DTOs defining stable API contracts (SessionDTO, MessageDTO, etc.)
- Business logic separated from infrastructure
API Layer:
- REST endpoints for sessions (POST, GET, DELETE)
- REST endpoints for messages (POST, GET)
- Pydantic request/response validation
- Consistent error handling (400, 403, 404)
- Interactive OpenAPI docs at
/docs
Developer Experience:
make migrateto run migrationsmake migrate-createto generate new migrations- Type-safe end-to-end with mypy strict mode
- Hot-reload on code changes
- One command to start everything (
make dev)
Hot damn!
Tip
Checkpoint: Before moving on, verify everything works:
- Run
make migrateand check for errors - Visit http://localhost:8000/docs and see all endpoints documented
- Create a session via curl or the Swagger UI
- Check the database:
docker exec -it infra-db-1 psql -U postgres -d agent_stack -c "SELECT * FROM sessions;" - Create a message for that session
- List messages and verify they’re returned chronologically
If anything fails, check make logs to debug.
What’s Next
In Part 3, we’re tackling Authentication & Security:
- Auth0 integration for user signup and login
- JWT validation middleware protecting endpoints
- Per-user data isolation (no more hardcoded
user_id = 1) - Session cookies for SSE streaming (EventSource can’t send auth headers)
- CORS configuration for production
After that, Part 4 is where things get exciting - we’ll integrate the OpenAI Agents SDK and build the actual agent streaming logic (finally!).
Next: Part 3 - Authentication & Security (Coming Soon)
This is part of a series on building production-ready AI agent applications. All code is open source on GitHub.
Info
Enjoying this series? Star the GitHub repo, share it with your team, or leave a comment below. Questions and suggestions are welcome - open an issue on GitHub or use the “Suggest an Edit” link at the bottom of the page.
Comments
Comments