Github Badge Co-authored with GPT-5-Pro Co-authored with Claude-4.5-Sonnet

Building a Production-Ready Agent Stack: Part 2 - Backend Core & Database

Our FastAPI server is running, but it’s just a health check endpoint talking to no one. Time to build the persistence layer - database models, repositories, domain services, and the REST API that will power our agent conversations.


Right now, our backend can tell you it’s alive (GET /health), but it can’t store a conversation or remember a user. Postgres is running in Docker, but we’re not using it. Let’s handle all that.

TLDR

You’ll build: async SQLAlchemy engine + Alembic, three ORM models, repositories, services, and REST endpoints with DTOs. You’ll learn: unit‑of‑work vs repo commits, avoiding N+1, 403 vs 404 in ownership checks, and why DTOs decouple DB from API. You’ll verify: run make migrate, create a session in Swagger, insert a message, list sessions/messages.

Branch code: part-2-backend-core-database

1
2
git fetch origin
git checkout part-2-backend-core-database

By the end of this post, you’ll have:

  • SQLAlchemy models for users, sessions, and messages with proper relationships
  • Alembic migrations versioning your schema changes in git
  • Repository pattern abstracting database access for testability
  • Domain services enforcing business logic and authorization
  • REST endpoints for full CRUD on sessions and messages

We’re building a three-layer architecture — API, Domain, Persistence — to keep concerns separated as complexity grows.

%%{init: {'theme':'base', 'themeVariables': { 'fontSize':'16px'}}}%%
graph TB
    Client[Client Request]
    API["API Layer<br/><small>app/api/</small><br/><small>HTTP routing, validation</small>"]
    Domain["Domain Layer<br/><small>app/domain/</small><br/><small>Business logic</small>"]
    Repo["Persistence Layer<br/><small>app/persistence/</small><br/><small>Database access</small>"]
    DB[(PostgreSQL)]

    Client --> API
    API --> Domain
    Domain --> Repo
    Repo --> DB

    style Client fill:#f0f4ff,stroke:#a5b4fc,stroke-width:2px
    style API fill:#dbeafe,stroke:#93c5fd,stroke-width:2px
    style Domain fill:#d1fae5,stroke:#6ee7b7,stroke-width:2px
    style Repo fill:#e9d5ff,stroke:#c084fc,stroke-width:2px
    style DB fill:#bfdbfe,stroke:#60a5fa,stroke-width:2px

The layers:

  1. API Layer (app/api/): HTTP routing, request validation, response serialization. Knows FastAPI, doesn’t know Postgres.

  2. Domain Layer (app/domain/): Business logic, validation rules, authorization. Pure Python - no FastAPI, no SQLAlchemy. Testable without a server or database.

  3. Persistence Layer (app/persistence/): Database access via ORM models and repositories. Knows Postgres, doesn’t know HTTP.

This separation means you can swap Postgres for MongoDB by rewriting only the persistence layer. Change from REST to GraphQL by rewriting only the API layer. Add a rule like “max 10 sessions per user” by editing only the domain layer. No cascading changes across the codebase.

Let’s get started!

Info

Do / Don’t

  • DO: Services own auth, validation, and transactions; repositories own queries.
  • DO: Keep handlers thin — extract params, call service, map exceptions to HTTP.
  • DON’T: Put SQL in handlers, or commit inside repositories when workflows are multi‑step.

Setting Up SQLAlchemy with Async Support

TLDR

  • Implement async engine + sessionmaker
  • Expose pool_size and max_overflow via Settings
  • Verify with /health DB ping

We’re using SQLAlchemy 2.0 with async support via asyncpg.

Why async matters for agent applications:

Agent apps are I/O-heavy. When a user sends a message, the flow looks like this:

  1. Save message to database (20ms I/O)
  2. Call LLM API and stream response (5-30 seconds I/O)
  3. Save response to database (20ms I/O)
  4. Update usage metrics (20ms I/O)

With synchronous code, each long operation blocks its worker. With a single worker, one user’s 10-second LLM call makes all others wait; 10 concurrent users finish at ~10s, 20s, …, 100s. With multiple workers (e.g., 4), requests finish in waves (~10s, 20s, 30s), improving throughput but still wasting time while each worker idles on I/O.

With async, if the LLM call is awaited I/O, the worker’s event loop can serve other requests while waiting. Ten concurrent users on one async worker can all complete in ~10s (plus overhead), because time is spent waiting on the remote service rather than blocking the server.

OK enough talk. Let’s create the database configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# backend/app/core/database.py
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker,
)
from app.core.settings import settings


# Create async engine
engine = create_async_engine(
    settings.database_url,
    echo=settings.is_development,  # Log SQL queries in dev
    pool_size=settings.pool_size,       # Tunable via Settings
    max_overflow=settings.max_overflow, # Tunable via Settings
    pool_pre_ping=True,  # Verify connections before using
)

# Session factory
SessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Don't expire objects after commit
)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Dependency for injecting database sessions into endpoints."""
    async with SessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

Let me walk you through what’s happening here - it’s more interesting than it looks!

The create_async_engine() function creates a connection to Postgres, but it doesn’t just make one connection. It creates a connection pool, which is like a cache of ready-to-use database connections that your application reuses.

Why connection pooling matters:

Every time Python needs to talk to Postgres, establishing a fresh connection takes 50-100ms. That might not sound like much, but when you’re handling hundreds of requests per second, those milliseconds add up fast. Connection pooling solves this by maintaining a set of “warm” connections that are already authenticated and ready to go. It’s the difference between opening a new browser tab for each Google search versus keeping tabs open.

Here’s what each parameter does and why you should care:

echo=settings.is_development: When you’re developing locally, this logs every SQL query to your console. This is incredibly useful for debugging - you’ll immediately see if an endpoint is doing 10 queries when it should only do 1 (the dreaded N+1 query problem).

In production, we turn this off because nobody wants gigabytes of query logs eating disk space.

pool_size/max_overflow (from Settings): Start with ~10/20. If all 10 are busy, SQLAlchemy can create up to 20 temporary connections. Tune conservatively; overly large pools can starve Postgres.

Why these numbers? They’re a conservative starting point. You might think “more is better,” but each connection consumes memory on both your application server and Postgres. Start here, monitor in production, and adjust based on your traffic patterns.

pool_pre_ping=True: Before handing a connection to your code, SQLAlchemy pings the database to make sure it’s still alive. Without this, you might grab a connection that Postgres closed due to inactivity, leading to random “connection closed” errors that are absolutely maddening to debug (trust me on this).

This tiny ping (1-2ms) saves you from those errors. Worth it.

expire_on_commit=False: By default, SQLAlchemy expires all objects after a commit, meaning the next time you access a field like user.email, it makes another database query to refresh the data. This is useful if you’re doing complex, long-running transactions where data might change.

But for API responses where we immediately serialize and return? It’s unnecessary overhead. We’re telling SQLAlchemy “we trust ourselves - when we commit, we’re done with that transaction, no need to re-fetch everything.”

The async_sessionmaker creates a factory for database sessions. Each time you call it, you get a new session (a transaction context). Think of it as a workspace for one logical unit of work - like “create a user and their first session.”

Finally, get_db() is a generator function (notice the yield). It creates a session, hands it to your code via the yield, and then - here’s the important part - guarantees it gets closed in the finally block, even if your code crashes with an exception. This prevents connection leaks where you gradually run out of database connections until the whole system grinds to a halt.

Important note about the return type: We’re using AsyncGenerator[AsyncSession, None] instead of just AsyncSession because this function uses yield. This is a SQLAlchemy 2.0 requirement - generator functions need to be properly typed as generators, not as their yielded type.

Expose pool tuning in your settings so prod changes don’t require code edits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# backend/app/core/settings.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    database_url: str
    is_development: bool = True
    pool_size: int = 10
    max_overflow: int = 20

settings = Settings()

Now let’s wire this into FastAPI’s dependency injection system:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# backend/app/api/dependencies.py
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db


async def get_session() -> AsyncGenerator[AsyncSession, None]:
    """FastAPI dependency for injecting database sessions."""
    async for session in get_db():
        yield session

This creates a wrapper around get_db() that we’ll use in our endpoints. You might be wondering “why not just use get_db() directly?” Good question! This extra layer gives us a place to add cross-cutting concerns later - logging, transaction management, request-scoped caching, etc.

Now here’s where FastAPI’s dependency injection really shines. Let’s update our existing health check endpoint in main.py to test database connectivity.

Open backend/app/main.py and update it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# backend/app/main.py
from fastapi import FastAPI, Depends
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.settings import settings
from app.api.dependencies import get_session

app = FastAPI(
    title=settings.api_title,
    version=settings.api_version,
    debug=settings.debug,
)


@app.get("/health")
async def health(db: AsyncSession = Depends(get_session)) -> dict[str, str | bool]:
    """Health check with database connectivity test."""
    try:
        # Test database connection
        await db.execute(text("SELECT 1"))
        database_status = "connected"
    except Exception as e:
        database_status = f"error: {str(e)}"

    return {
        "status": "ok",
        "env": settings.env,
        "debug": settings.is_development,
        "database": database_status,
    }

Look at that db: AsyncSession = Depends(get_session) parameter we added. FastAPI sees this and automatically:

  1. Calls get_session() before your endpoint runs
  2. Passes the database session as the db parameter to your function
  3. Ensures the session gets closed after the request completes (even if an exception occurs)

You never write db = create_session() or db.close(). FastAPI handles the entire lifecycle. This is dependency injection at its finest - your code simply declares what it needs, and the framework provides it. No manual session management, no leaked connections, no boilerplate.

Note about text(): SQLAlchemy 2.0 requires raw SQL strings to be wrapped in text(). This makes it explicit that you’re executing raw SQL (as opposed to using the query builder). It’s a small safety feature that prevents accidental SQL injection when someone mistakenly passes a string where a query object was expected.

Try it now:

1
2
3
4
5
# Make sure your services are running
make dev

# In another terminal, hit the health endpoint
curl http://localhost:8000/health

You should see:

1
2
3
4
5
6
{
  "status": "ok",
  "env": "development",
  "debug": true,
  "database": "connected"
}

If Postgres is down, the database field will show an error message instead. This is your first working endpoint with database access!

Alembic: Version Control for Your Database Schema

TLDR

  • Autogenerate migrations from ORM changes
  • Run via Make targets inside Docker
  • Always review the generated diff before applying

Here’s a question: how do you deploy a database schema change to production? Do you manually run SQL scripts? Keep a folder of numbered SQL files? Hope you remember what you changed?

This is where Alembic comes in. It’s like git for your database schema. Every time you change an ORM model (add a column, create a table, add an index), Alembic generates a migration file that describes the change. These migrations are versioned, reversible, and stored in git alongside your code.

The magic of Alembic is that it auto-generates migrations by comparing your ORM models to the actual database. You add a field to your User model, run make migrate-create, and Alembic says “oh, you added an email column - here’s the SQL to add it.” You review the generated migration, and if it looks good, apply it with make migrate.

Let’s set it up:

1
2
cd backend
uv run alembic init alembic

This creates an alembic/ directory with configuration files and a versions/ folder where migrations live. Now we need to configure it for async SQLAlchemy.

Open alembic/env.py and replace the contents with this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# backend/alembic/env.py
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# Import your models' metadata
from app.persistence.models import Base
from app.core.settings import settings

# Alembic Config object
config = context.config

# Override database URL from settings
config.set_main_option("sqlalchemy.url", settings.database_url)

# Setup logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# This is your models' MetaData object
target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode (generate SQL only)."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online() -> None:
    """Run migrations in 'online' mode (connect to database)."""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    import asyncio
    asyncio.run(run_migrations_online())

This might look intimidating even though it mostly came from the default Alembic template (we convert to async), but here’s what matters:

Line 11-12: We import our ORM models’ Base metadata and our settings. This tells Alembic about our models.

Line 18: We override the database URL from our settings instead of hardcoding it in alembic.ini. This way, we can use different databases for dev, staging, and production.

Line 26: The target_metadata = Base.metadata line is crucial - this is how Alembic knows about your models. When you run make migrate-create, Alembic compares this metadata against the actual database to figure out what changed.

Line 43-53: The run_migrations_online() function uses an async engine instead of the default synchronous one. This matches our async SQLAlchemy setup. Without this, you’d get errors about mixing sync and async code.

The run_migrations_offline() function generates SQL without connecting to the database - useful for creating migration scripts you can review before running.

Now let’s add convenient commands to our Makefile so you don’t have to type long alembic commands. We’ll run these inside Docker to ensure we always talk to the Compose Postgres service:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Makefile (add to existing file)

# Database migrations (run inside backend container)
migrate:
	@echo "Running database migrations..."
	cd infra && docker compose exec backend uv run alembic upgrade head

migrate-create:
	@echo "Creating new migration..."
	cd infra && docker compose exec -T backend sh -lc '\
	  read -p "Migration message: " msg; \
	  uv run alembic revision --autogenerate -m "'"'"$$msg"'"'"'
	'

migrate-rollback:
	@echo "Rolling back last migration..."
	cd infra && docker compose exec backend uv run alembic downgrade -1

These three commands are all we need:

make migrate: Runs all pending migrations. This brings your database up to date with the latest schema. Run this after pulling new code from git, or when setting up a fresh development environment.

make migrate-create: Generates a new migration by comparing your ORM models to the database. It prompts you for a message (like a git commit message). Be descriptive-“add email to user” is better than “changes.”

make migrate-rollback: Undoes the last migration. Useful when you realize you made a mistake and want to try again.

Here’s your typical workflow:

  1. Edit your ORM models: Add a field, create a new model, whatever you need.
  2. Generate the migration: Run make migrate-create and provide a descriptive message.
  3. Review the generated file: Open alembic/versions/<timestamp>_your_message.py and check the upgrade() and downgrade() functions. Make sure they do what you expect.
  4. Apply the migration: Run make migrate to update your database.
  5. Commit to git: The migration file goes in version control alongside your code.

Tip

Run migrations inside Docker. This avoids host Postgres conflicts and ensures the commands connect to the Compose db service via its internal DNS. If you really need to run Alembic on the host, update backend/.env to point at the mapped host port (and ensure nothing else is bound to that port). Otherwise prefer docker.

Warning

Always review auto-generated migrations before applying them. Alembic is smart, but it’s not perfect. Here are common gotchas:

  • Column renames: Alembic sees these as drop + add, which loses all data in that column. You need to manually edit the migration to use op.alter_column() instead.
  • Data migrations: Adding a new non-nullable column requires a two-step migration (add as nullable, backfill data, then make it non-nullable).
  • Index names: Alembic sometimes generates different index names than you expect, leading to duplicate indexes.

Always check the upgrade() and downgrade() functions. If something looks wrong, edit it before running make migrate. Your future self (and your teammates) will thank you.

Building the Domain Models

TLDR

  • Three ORM models: User, Session, Message
  • Timezone-aware timestamps generated by DB
  • Indexes for common queries (user sessions by updated_at)

Now for the fun part—defining what our database actually stores. Think about how a chat application works: You have users who create conversations (we’ll call them sessions), and each conversation contains messages going back and forth.

That’s exactly what we’re modeling here:

  • User: Represents a person using your agent app (we’ll link this to Auth0 in Part 3)
  • Session: A conversation thread with the agent (like a ChatGPT conversation)
  • Message: Individual messages within a session (both what the user says and what the agent responds)

This is the minimal viable structure for a stateful agent application. Users own Sessions, Sessions contain Messages. It’s a simple hierarchy, but it scales beautifully-you could have millions of users, each with hundreds of sessions, each session with thousands of messages.

Let’s define these models using SQLAlchemy’s ORM:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# backend/app/persistence/models.py
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, Index, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    """Base class for all ORM models."""
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    auth0_id: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), nullable=False
    )

    # Relationship: one user has many sessions
    sessions: Mapped[list["Session"]] = relationship(
        "Session", back_populates="user", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, email={self.email})>"


class Session(Base):
    __tablename__ = "sessions"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    user_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
    )
    title: Mapped[str] = mapped_column(String(500), nullable=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
    )

    # Relationships
    user: Mapped["User"] = relationship("User", back_populates="sessions")
    messages: Mapped[list["Message"]] = relationship(
        "Message", back_populates="session", cascade="all, delete-orphan"
    )

    # Index for fast user session lookups (sort by last activity)
    __table_args__ = (Index("idx_user_updated", "user_id", "updated_at"),)

    def __repr__(self) -> str:
        return f"<Session(id={self.id}, title={self.title[:30]})>"


class Message(Base):
    """
    User-facing conversation history including tool calls.

    The role field supports:
    - "user": Messages from the user
    - "assistant": Responses from the agent
    - "tool_call": Lightweight markers showing "Agent used tool X"
    - "system": Optional system messages
    """
    __tablename__ = "messages"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    session_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("sessions.id", ondelete="CASCADE"), nullable=False
    )
    role: Mapped[str] = mapped_column(String(50), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)

    # Optional: only populated for tool_call messages
    tool_name: Mapped[str | None] = mapped_column(String(255), nullable=True)

    # Links to external tracing (OpenAI API, Phoenix Arize)
    trace_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
    span_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)

    tokens: Mapped[int] = mapped_column(Integer, default=0)  # For usage tracking
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), nullable=False
    )

    # Relationship
    session: Mapped["Session"] = relationship("Session", back_populates="messages")

    # Index for fast session message lookups
    __table_args__ = (Index("idx_session_created", "session_id", "created_at"),)

    def __repr__(self) -> str:
        return f"<Message(id={self.id}, role={self.role}, content={self.content[:30]})>"

Let me walk you through the design decisions in this schema - each one solves a real problem I’ve encountered in production:

User model:

  • auth0_id: This is how we link database users to Auth0 accounts. When someone logs in with Auth0, we get an ID like auth0|507f1f77bcf86cd799439011. We store that here, never storing passwords ourselves. Security win.
  • email: We keep a copy of the email for convenience (displaying in the UI, sending notifications). It’s also indexed for fast lookups.
  • created_at: Always useful for analytics (“how many users signed up this week?”) and debugging (“when did this user join?”).

Session model:

  • user_id foreign key: Links sessions to users. The ondelete="CASCADE" means when you delete a user, all their sessions automatically disappear. No orphaned data.
  • title: A human-readable name for the conversation. Usually auto-generated from the first message (“Help me debug Python code”) but users can rename it.
  • updated_at: Timezone-aware and server-generated; updates via onupdate=func.now() whenever a row changes. Powers the “most recently active conversations” list in your UI.
  • cascade="all, delete-orphan": Delete a session -> automatically delete all its messages. Postgres enforces this, so you can’t accidentally orphan messages.
  • Index("idx_user_updated"): Composite index on (user_id, updated_at). Optimized for “list sessions for user 123 ordered by last activity” even at high scale.

Message model:

  • role: Can be “user”, “assistant”, “tool_call”, or “system”. This schema goes beyond the basic OpenAI convention by explicitly tracking tool calls as separate messages. When the agent calls a tool, we save a message with role="tool_call" so the UI can show “Agent used search_database tool” as part of the conversation flow. In the API layer, we type this as an Enum for safety.
  • content: The actual message text. We use Text (unlimited length) instead of String (limited) because agent responses can be looong. For tool_call messages, this contains a user-friendly summary like “Called search_database”.
  • tool_name: Only populated when role="tool_call". This stores which tool was executed (e.g., “search_database”, “web_fetch”). The UI uses this to render tool-specific icons and styling.
  • trace_id: Links to the entire agent run in your tracing system (OpenAI API or Phoenix Arize). All messages in the same agent execution share the same trace_id. Click on any message -> see the full execution trace including all tool calls, LLM requests, and timing data. Indexed for fast lookups.
  • span_id: Links to the specific operation within the trace. For tool_call messages, this is the span ID for that specific tool execution. For assistant messages, this might be the LLM generation span. This lets you jump directly to the exact operation in your trace viewer instead of searching through the entire trace. Indexed for fast lookups.
  • tokens: How many tokens this message consumed. The OpenAI API returns this for each run, and we store it for billing and rate limiting (Part 6). Super useful for “you’ve used 45,000 tokens this month” displays.
  • Index("idx_session_created"): Fast chronological message retrieval. When you open a session, you want messages in order, fast.

Relationships: Notice the relationship() declarations like sessions: Mapped[list["Session"]] = relationship(...). These let you write user.sessions to get all of a user’s sessions without writing SQL. SQLAlchemy handles the join automatically. It’s magical in development, but be careful-loading relationships can cause N+1 query problems if you’re not mindful (we’ll address this with DTOs).

Note

We’re using SQLAlchemy 2.0’s Mapped[] syntax instead of the older Column() approach. This gives us better type inference - mypy can tell you if you’re accessing a field that doesn’t exist at compile time, not in the middle of the night when your production app crashes. If you see code online using Column(Integer, ...), that’s the old style. This is the new hotness.

Real-Time Tool Execution Display

Here’s a critical design decision for agent applications: How do you show tool execution in the UI?

Users expect to see what the agent is doing in real-time - “Searching database…” while it’s happening, then “search_database completed” when it’s done. And when they scroll through chat history later, they should see “Agent used search_database” as part of the conversation flow.

This gets tricky because we’re using external tracing platforms (OpenAI API trace viewer or Phoenix Arize) for detailed debugging. We don’t want to duplicate all that data in our application database - that’s expensive and redundant. But we do need enough information to show tool execution in the user-facing UI.

Here’s how we solve this with a two-tier architecture:

Tier 1: Real-time streaming (SSE)

While the agent is running, we stream events to the frontend using Server-Sent Events (remember Part 1?). These events are ephemeral - they exist only during the request, never stored in the database:

sequenceDiagram
    participant UI
    participant Backend
    participant Agent

    UI->>Backend: Open SSE connection
    Backend->>Agent: start()
    Agent-->>Backend: tool_call_started
    Backend-->>UI: event: tool_call_started
    Agent-->>Backend: tool_call_completed
    Backend-->>UI: event: tool_call_completed
    Agent-->>Backend: assistant_message
    Backend-->>UI: event: assistant_message
    Backend-->>UI: event: done
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Events streamed to frontend during agent execution:
{
  "type": "tool_call_started",
  "tool_name": "search_database",
  "timestamp": "2025-10-30T10:30:00Z"
}

{
  "type": "tool_call_completed",
  "tool_name": "search_database",
  "result_summary": "Found 3 results",
  "timestamp": "2025-10-30T10:30:02Z"
}

{
  "type": "message_created",
  "role": "assistant",
  "content": "I found 3 relevant documents...",
  "timestamp": "2025-10-30T10:30:03Z"
}

The frontend receives these events as they happen and updates the UI in real-time:

  • When tool_call_started arrives -> Show “Calling search_database…” with a spinner
  • When tool_call_completed arrives -> Update to “search_database completed”
  • When message_created arrives -> Show the final assistant response

This gives users live feedback without any database writes. Fast, cheap, and responsive.

Tier 2: Persistent storage (Messages table)

After the agent finishes, we save the conversation to the database for historical display. This is where the Message model’s role="tool_call" support comes in:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Example: Conversation stored in the messages table
# (Simplified for clarity)

# User message (starts the trace)
Message(
    role="user",
    content="What's in our database?",
    trace_id="trace_abc123",  # All messages in this run share this
    created_at="2025-10-30T10:29:58Z"
)

# Tool call (lightweight marker)
Message(
    role="tool_call",
    tool_name="search_database",
    content="Searched database for all records",
    trace_id="trace_abc123",      # Same trace as above
    span_id="span_tool_xyz789",   # Specific tool execution span
    created_at="2025-10-30T10:30:00Z"
)

# Assistant response
Message(
    role="assistant",
    content="I found 3 documents in the database: ...",
    trace_id="trace_abc123",      # Same trace
    span_id="span_llm_def456",    # LLM generation span
    tokens=87,
    created_at="2025-10-30T10:30:03Z"
)

When rendering chat history, the UI shows:

  1. User’s question
  2. “search_database ran” (grayed out, maybe collapsible)
  3. Agent’s answer

The tool_call message is a lightweight placeholder. It says “the agent used this tool” without storing the tool’s arguments, raw output, or execution details. If you need to debug what went wrong, you have two levels of detail:

  1. Look for the trace_id -> See the entire agent run in your tracing platform:

    • All tool calls in sequence
    • All LLM requests and responses
    • Total execution time
    • Overall token usage
    • High-level flow visualization
  2. Look for the span_id -> Jump directly to the specific operation (e.g., the search_database tool call):

    • Exact tool arguments: {"query": "SELECT * FROM documents", "limit": 100}
    • Raw tool output: The full JSON response
    • Execution time: 247ms
    • Error logs if it failed
    • Retry attempts
    • Parent-child span relationships

Why this architecture works:

Division of responsibilities:

  • Application database: User-facing conversation flow for the UI (what users see)
  • Tracing platform: Developer-facing debugging details (what engineers need)

What we DON’T store in our DB:

  • Tool call arguments (can be huge, contain sensitive data, change frequently)
  • Raw tool outputs (stored in tracing platform)
  • Execution timing (tracing platform)
  • Error stack traces (tracing platform)

What we DO store:

  • That a tool was called (for UI display)
  • Tool name (to show “search_database ran”)
  • User-friendly summary in content field
  • Links to tracing platform via trace_id and span_id
  • Chronological ordering in conversation

This keeps your application database lean while giving users the transparency they expect. When a user says “the agent failed,” you can:

  1. Look at their conversation in the app -> See which tool call happened
  2. Copy the trace_id -> View the entire agent run in your tracing platform
  3. Or copy the span_id from the specific tool_call -> Jump directly to that tool’s execution
  4. See the full execution details -> Debug the actual issue

Implementation preview:

Here’s what the agent service will look like when we integrate OpenAI Agents SDK in Part 4:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def run_agent(session_id: int, user_message: str):
    # Start agent run (OpenAI SDK) - returns trace_id for entire run
    trace_id = await openai_agent.start_run(user_message)

    # Save user message to DB with trace_id
    await message_repo.create(
        session_id=session_id,
        role="user",
        content=user_message,
        trace_id=trace_id
    )

    # Stream events to frontend via SSE
    async for event in openai_agent.stream_events(trace_id):
        if event.type == "tool_call_started":
            # Send to UI immediately (SSE)
            await sse_stream.send({
                "type": "tool_call_started",
                "tool_name": event.tool_name,
            })

            # Save lightweight marker to DB
            await message_repo.create(
                session_id=session_id,
                role="tool_call",
                tool_name=event.tool_name,
                content=f"Called {event.tool_name}",
                trace_id=trace_id,           # Same trace for all messages
                span_id=event.span_id        # Specific tool execution span
            )

        elif event.type == "message_created":
            # Send to UI
            await sse_stream.send(event)

            # Save assistant response to DB
            await message_repo.create(
                session_id=session_id,
                role="assistant",
                content=event.content,
                trace_id=trace_id,           # Same trace
                span_id=event.span_id,       # LLM generation span
                tokens=event.usage.total_tokens
            )

This pattern gives us:

  • Real-time UI updates during execution (users see progress)
  • Chat history shows “tool ran” after completion (transparent conversation flow)
  • Minimal database storage (no duplication with tracing platform)
  • Full debugging available externally (engineers can troubleshoot)
  • Privacy and security (sensitive tool arguments never stored in app DB)

The key insight here is that tool_call messages are just another message type in the conversation flow - lightweight placeholders that enhance the user experience without creating a database bloat problem.

In Part 4, when we integrate the OpenAI Agents SDK, you’ll see this architecture in action. For now, the important takeaway is that our Message model is designed to support this dual-tier approach from day one.

Great! We’ve defined our ORM models. Now let’s create and apply the initial migration to set up the database tables.

Tip

Running migrations inside Docker: All migration commands execute inside the backend container using docker compose exec backend. This ensures the commands can reach the database via Docker’s internal networking (db:5432). The alembic/ directory is mounted as a volume, so migrations you create inside the container appear immediately on your host machine.

First, make sure your services are running:

1
make dev

Now create and apply the migration:

1
2
3
4
5
6
7
8
9
# Create a new migration
make migrate-create
# When prompted, enter: "add user session message tables"

# Review the generated migration
cat backend/alembic/versions/*_add_user_session_message_tables.py

# Apply the migration
make migrate

Verify the tables were created:

1
docker exec -it infra-db-1 psql -U postgres -d agent_stack -c "\dt"

You should see:

1
2
3
4
5
6
7
           List of relations
 Schema |      Name       | Type  |  Owner
--------+-----------------+-------+----------
 public | alembic_version | table | postgres
 public | messages        | table | postgres
 public | sessions        | table | postgres
 public | users           | table | postgres

Troubleshooting:

  • “No ‘script_location’ key found in configuration”: The backend container is missing Alembic files. Rebuild with docker compose -f infra/docker-compose.yml up --build -d.
  • “No such service: backend”: Services aren’t running. Start them with make dev first.
  • “Could not translate host name ‘db’ to address”: You’re trying to run migrations on your host machine instead of inside Docker. Use make migrate instead of running alembic directly.

The Repository Pattern: Clean Data Access

Alright, we have ORM models that map Python classes to database tables. But here’s the thing - you don’t want to scatter SQLAlchemy queries throughout your codebase like confetti at a parade. That way lies madness.

Let me show you what I mean. Imagine you’re building endpoints, and every time you need to find a user by email, you write:

1
2
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()

This seems fine at first. But then you write this in 10 different files. Then someone says “we should cache user lookups,” and you’re grepping through the codebase hunting for every single query. Or you decide to add logging when users are accessed. Or you want to switch from Postgres to MongoDB (God help you). Suddenly you’re touching 20+ files for one change.

TL;DR

  • Repositories own queries; services own transactions
  • Keep handlers thin; don’t put SQL in endpoints
  • One import point per model (easy to mock)

Enter the Repository pattern.

A repository is a class that encapsulates all database operations for a specific model. It’s the single source of truth for “how do I get users from the database?” Instead of repeating queries everywhere, you write:

1
user = await user_repo.get_by_email(db, email)

That’s it. One line. The repository handles the query. If you need to add caching, logging, or change the implementation entirely, you change it in one place - the repository.

The benefits compound fast:

  • Testability: Mock user_repo in unit tests without touching the database. No test database setup, no fixtures, just pure business logic testing.
  • Consistency: All User queries live in one place. No more “wait, which endpoint uses the optimized query and which one doesn’t?”
  • Maintainability: Change the query logic once, not in 20 files. Future you will thank present you.
  • Swappable: Move from Postgres to MongoDB? Rewrite the repository. Your business logic doesn’t change.

Let’s build the UserRepository:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# backend/app/persistence/repositories/user_repo.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import User


class UserRepository:
    """Repository for User model operations."""

    async def create(self, db: AsyncSession, auth0_id: str, email: str) -> User:
        """Create a new user."""
        user = User(auth0_id=auth0_id, email=email)
        db.add(user)
        await db.flush()
        await db.refresh(user)  # Load generated ID
        return user

    async def get_by_id(self, db: AsyncSession, user_id: int) -> User | None:
        """Get user by ID."""
        result = await db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

    async def get_by_auth0_id(self, db: AsyncSession, auth0_id: str) -> User | None:
        """Get user by Auth0 ID."""
        result = await db.execute(select(User).where(User.auth0_id == auth0_id))
        return result.scalar_one_or_none()

    async def get_by_email(self, db: AsyncSession, email: str) -> User | None:
        """Get user by email."""
        result = await db.execute(select(User).where(User.email == email))
        return result.scalar_one_or_none()

    async def list_all(self, db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
        """List all users with pagination."""
        result = await db.execute(select(User).offset(skip).limit(limit))
        return list(result.scalars().all())


# Singleton instance
user_repo = UserRepository()

Look at the structure here - it’s beautifully simple:

Every method takes db: AsyncSession as the first parameter: This is dependency injection in action. The repository doesn’t create its own database connection; it uses whatever session you pass in. This means the repository works with your request-scoped session from FastAPI, your test database session, or even a transaction you’re managing manually. Flexible and testable.

Methods return domain objects (User), not database rows: When you call get_by_id(), you get a User object with all its fields and relationships. Not a tuple, not a dictionary, but a proper Python object with type hints. Your IDE can autocomplete fields, mypy can catch typos.

Single results use scalar_one_or_none(), lists use scalars().all(): This is SQLAlchemy’s way of saying “give me one result (or None)” versus “give me all results as a list.” The scalar_one() variant raises an exception if not found, which is useful when you expect a result to exist. The _or_none() variant returns None, perfect for “optional” lookups.

Pagination is built-in: Every list method has skip and limit parameters. This prevents accidentally loading 10 million rows into memory when someone asks for “all users.” Start with sensible defaults (usually 50-100), and let the caller override if needed.

The singleton pattern: At the bottom, we create user_repo = UserRepository() and import it everywhere. This isn’t a true singleton (Python allows multiple instances), but by convention we use one shared instance. It’s stateless (no instance variables), so there’s no benefit to creating multiple copies.

Why singletons? It makes testing dead simple. Instead of mock.patch('app.persistence.repositories.user_repo.UserRepository'), you can just mock.patch('app.persistence.repositories.user_repo.user_repo') to replace the whole object. Cleaner tests. We will see this later on when we write unit tests.

Now let’s build SessionRepository following the same pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# backend/app/persistence/repositories/session_repo.py
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import Session


class SessionRepository:
    """Repository for Session model operations."""

    async def create(self, db: AsyncSession, user_id: int, title: str) -> Session:
        """Create a new session."""
        session = Session(user_id=user_id, title=title)
        db.add(session)
        await db.flush()
        await db.refresh(session)
        return session

    async def get_by_id(self, db: AsyncSession, session_id: int) -> Session | None:
        """Get session by ID."""
        result = await db.execute(select(Session).where(Session.id == session_id))
        return result.scalar_one_or_none()

    async def list_by_user(
        self, db: AsyncSession, user_id: int, skip: int = 0, limit: int = 50
    ) -> list[Session]:
        """List sessions for a user, sorted by most recent."""
        result = await db.execute(
            select(Session)
            .where(Session.user_id == user_id)
            .order_by(desc(Session.updated_at))
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def list_by_user_with_counts(
        self, db: AsyncSession, user_id: int, skip: int = 0, limit: int = 50
    ) -> list[tuple[Session, int]]:
        """List sessions for a user with message counts in one query."""
        from sqlalchemy import func, select
        from app.persistence.models import Message

        stmt = (
            select(Session, func.count(Message.id).label("message_count"))
            .outerjoin(Message, Message.session_id == Session.id)
            .where(Session.user_id == user_id)
            .group_by(Session.id)
            .order_by(desc(Session.updated_at))
            .offset(skip)
            .limit(limit)
        )
        result = await db.execute(stmt)
        return [(row[0], int(row[1])) for row in result.all()]

    async def update_title(self, db: AsyncSession, session_id: int, title: str) -> Session | None:
        """Update session title."""
        session = await self.get_by_id(db, session_id)
        if session:
            session.title = title
            await db.flush()
            await db.refresh(session)
        return session

    async def delete(self, db: AsyncSession, session_id: int) -> bool:
        """Delete a session."""
        session = await self.get_by_id(db, session_id)
        if session:
            await db.delete(session)
            await db.flush()
            return True
        return False


session_repo = SessionRepository()

The SessionRepository adds a few interesting methods:

update_title(): Sessions can be renamed. Users often start a conversation with “Help me debug code” but realize it’s actually about “Python async troubleshooting” after a few messages. This method handles that update.

delete(): Returns True if deletion succeeded, False if the session didn’t exist. This lets the caller know whether they deleted something or tried to delete a ghost. Useful for “Session not found” versus “Successfully deleted” responses.

list_by_user() sorts by updated_at: Notice we’re using desc(Session.updated_at) to show the most recently active sessions first. This is what users expect - your most recent conversations at the top. The idx_user_updated index makes this query fast even with thousands of sessions.

Now let’s add MessageRepository:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# backend/app/persistence/repositories/message_repo.py
from sqlalchemy import select, desc, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import Message


class MessageRepository:
    """Repository for Message model operations."""

    async def create(
        self,
        db: AsyncSession,
        session_id: int,
        role: str,
        content: str,
        tool_name: str | None = None,
        trace_id: str | None = None,
        span_id: str | None = None,
        tokens: int = 0,
    ) -> Message:
        """Create a new message."""
        message = Message(
            session_id=session_id,
            role=role,
            content=content,
            tool_name=tool_name,
            trace_id=trace_id,
            span_id=span_id,
            tokens=tokens
        )
        db.add(message)
        await db.flush()
        await db.refresh(message)
        return message

    async def get_by_id(self, db: AsyncSession, message_id: int) -> Message | None:
        """Get message by ID."""
        result = await db.execute(select(Message).where(Message.id == message_id))
        return result.scalar_one_or_none()

    async def list_by_session(
        self, db: AsyncSession, session_id: int, skip: int = 0, limit: int = 100
    ) -> list[Message]:
        """List messages for a session, sorted chronologically."""
        result = await db.execute(
            select(Message)
            .where(Message.session_id == session_id)
            .order_by(Message.created_at)  # Chronological order
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def count_by_session(self, db: AsyncSession, session_id: int) -> int:
        """Count messages in a session."""
        result = await db.execute(
            select(func.count(Message.id)).where(Message.session_id == session_id)
        )
        return result.scalar() or 0


message_repo = MessageRepository()

The MessageRepository is the simplest of the three because messages are mostly immutable - you create them and read them, but rarely update or delete individual messages (you delete the whole session instead).

list_by_session() sorts chronologically: Messages need to be in order - user message, then agent response, then user message, etc. We sort by created_at ascending (oldest first) to maintain the conversation flow.

count_by_session(): Useful for displaying “152 messages” in the UI without loading all 152 messages. We use func.count() which executes as a SQL COUNT(*) query - fast even with thousands of messages.

Tip

90‑second lab: rename + verify order

  • Create two sessions; add a message to the older one.
  • Call update_title() on the older session.
  • List sessions: it should now sort by updated_at (recently changed first).

Putting it all together:

Now instead of writing raw SQLAlchemy queries scattered across 20 files, we have three clean repositories. Here’s what using them looks like:

1
2
3
4
5
6
7
8
# Get user by Auth0 ID
user = await user_repo.get_by_auth0_id(db, "auth0|123")

# List user's sessions (most recent first)
sessions = await session_repo.list_by_user(db, user_id=user.id)

# Get messages for the first session
messages = await message_repo.list_by_session(db, session_id=sessions[0].id)

Three lines. Three named operations. Zero SQL. If you need to add Redis caching tomorrow, you change the repository methods. If you need to log every database access, you add it to the repository. If you need to switch to a different database… you get the idea.

This is the power of repositories - clean, named operations instead of anonymous SQLAlchemy queries scattered everywhere. Your future self will thank you when it’s time to refactor.

Domain Services: Where Business Logic Lives

Okay, we’ve got repositories handling database access. But here’s the million-dollar question: where does your actual business logic go?

Not in your API handlers. Those should be thin HTTP routers that map requests to operations.

Not in your repositories. Those are just query builders - they shouldn’t know about business rules.

You need a domain services layer. This is where the real meat of your application lives.

The Three-Layer Architecture

Let me paint you a picture. Imagine your backend as a sandwich:

  1. Top layer (API handlers): The bread. Thin, handles HTTP concerns (routing, status codes, JSON serialization). Knows about FastAPI and HTTP.
  2. Middle layer (domain services): The filling. Thick, contains business logic, validation, authorization, orchestration. Knows nothing about HTTP or databases.
  3. Bottom layer (repositories): The other slice of bread. Thin, handles database queries. Knows about SQLAlchemy.

When a request comes in, it flows through all three layers:

1
HTTP Request > API Handler > Domain Service > Repository > Database

The response flows back up:

1
Database > Repository > Domain Service > API Handler > HTTP Response

Why three layers instead of just putting everything in the API handler?

I’ve seen what happens when you don’t. Here’s a real story:

I once worked on a codebase where every API endpoint looked like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@app.post("/sessions")
async def create_session(title: str, db: Session):
    # Validation
    if not title or len(title.strip()) == 0:
        raise HTTPException(400, "Title required")
    if len(title) > 500:
        raise HTTPException(400, "Title too long")

    # Database query
    session = Session(user_id=current_user.id, title=title)
    db.add(session)
    db.commit()

    return session

Seems reasonable, right? But this approach led to:

  • Test nightmare: To test the business rule “titles can’t be empty,” we had to start FastAPI, make HTTP requests, and connect to a test database. Tests were slow and brittle.
  • Duplication everywhere: The “title validation” logic appeared in 3 different endpoints. When requirements changed (“titles must be unique per user”), we had to update all 3 places.
  • Can’t reuse: When we added a CLI tool to create sessions for testing, we couldn’t reuse any code. Had to duplicate the validation logic.
  • Tight coupling: Changing from Postgres to MongoDB required rewriting every single endpoint because the SQL queries were embedded directly in the handlers.

The three-layer architecture fixes all of this.

What Goes in a Domain Service?

A domain service encapsulates all operations for a specific domain entity (like “Session” or “Message”). Think of it as the single source of truth for “how do I work with sessions?”

Here’s what belongs in a domain service:

  1. Validation rules: “Session titles must be 1-500 characters”
  2. Authorization checks: “Users can only access their own sessions”
  3. Orchestration: “When fetching a session, also count its messages”
  4. Business policies: “Users can have max 10 active sessions”
  5. Complex workflows: “Creating a session also creates a welcome message”

Here’s what does NOT belong:

  • HTTP stuff (status codes, headers, cookies)
  • Database stuff (SQL queries, transactions)
  • Framework-specific code (FastAPI dependencies, decorators)

The domain service should be framework-agnostic. You should be able to call it from FastAPI, a CLI tool, a background job, or a test — without changing a single line.

Building SessionService

TLDR

  • Services enforce validation and authorization
  • Services own transactions (async with db.begin())
  • Repos do add/flush/refresh; no commit

Let’s build our SessionService step by step. I’ll explain each method as we go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# backend/app/domain/services/session_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.repositories.session_repo import session_repo
from app.persistence.repositories.message_repo import message_repo
from app.domain.dtos import SessionDTO, SessionCreateDTO


class SessionService:
    """Business logic for session operations."""

    async def create_session(self, db: AsyncSession, user_id: int, title: str) -> SessionDTO:
        """Create a new session with validation."""
        # Validation - this is business logic, not database logic
        if not title or len(title.strip()) == 0:
            raise ValueError("Session title cannot be empty")

        if len(title) > 500:
            raise ValueError("Session title too long (max 500 characters)")

        async with db.begin():
            session = await session_repo.create(db, user_id=user_id, title=title.strip())
            return SessionDTO.from_orm(session, message_count=0)

    async def get_session(self, db: AsyncSession, session_id: int, user_id: int) -> SessionDTO | None:
        """Get session with ownership check."""
        session = await session_repo.get_by_id(db, session_id)

        # Authorization check - critical business logic!
        if session and session.user_id != user_id:
            raise PermissionError("Session does not belong to user")

        if not session:
            return None

        # Orchestration - fetch related data
        message_count = await message_repo.count_by_session(db, session_id)

        return SessionDTO.from_orm(session, message_count=message_count)

    async def list_user_sessions(
        self, db: AsyncSession, user_id: int, skip: int = 0, limit: int = 50
    ) -> list[SessionDTO]:
        """List sessions for a user (avoids N+1)."""
        rows = await session_repo.list_by_user_with_counts(db, user_id, skip, limit)
        return [SessionDTO.from_orm(sess, message_count=count) for (sess, count) in rows]

    async def delete_session(self, db: AsyncSession, session_id: int, user_id: int) -> bool:
        """Delete session with ownership check."""
        session = await session_repo.get_by_id(db, session_id)

        if not session:
            return False

        # Authorization - prevent users from deleting others' data
        if session.user_id != user_id:
            raise PermissionError("Cannot delete another user's session")

        async with db.begin():
            return await session_repo.delete(db, session_id)


session_service = SessionService()  # Singleton instance

Let’s break down what each method is doing and why:

create_session() - Validation before persistence:

  • We validate the title before hitting the database. Why? Because validation is a business rule, not a database concern.
  • We could add more rules here: “users can only have 10 active sessions,” “titles must be unique per user,” etc.
  • The repository doesn’t know or care about these rules — it just saves data.

get_session() - Authorization and orchestration:

  • Authorization: Check if the session belongs to the user. This prevents user A from accessing user B’s sessions.
  • Orchestration: Fetch the session AND count its messages. The API client needs both pieces of data, so we fetch them together.
  • Notice we’re raising PermissionError, not returning an HTTP 403. The service doesn’t know about HTTP—that’s the API layer’s job.

list_user_sessions() - Multi-repository orchestration (no N+1):

  • We call a single repo method that returns sessions with message counts to avoid N+1 queries.
  • This is orchestration — coordinating repositories while keeping SQL details inside repos.

delete_session() - Authorization before deletion:

  • Same authorization check as get_session(). We’re enforcing the rule “users can only delete their own data.”
  • If we later add “admins can delete any session,” we add that logic here, not in 5 different places.

Common Questions About This Approach

Warning

Pitfall: 403 vs 404

Q: Why not filter by user_id in the repository?

You might wonder: “Why fetch the session first, then check ownership? Why not just get_by_id_and_user()?”

Here’s why the current approach is better:

1
2
3
4
5
# Bad: Repository filters by user_id
session = await session_repo.get_by_id_and_user(db, session_id, user_id)
if not session:
    # Problem: Is it missing, or does it belong to someone else?
    raise HTTPException(404, "Not found")  # Wrong status code!

With this approach, you can’t distinguish between:

  • “Session #123 doesn’t exist” → should return HTTP 404
  • “Session #123 exists but belongs to user #456” → should return HTTP 403 Forbidden

Our approach keeps authorization explicit and centralized:

1
2
3
4
5
6
# Good: Fetch first, then check ownership
session = await session_repo.get_by_id(db, session_id)
if session and session.user_id != user_id:
    raise PermissionError("Not authorized")  # → HTTP 403
if not session:
    return None  # → HTTP 404

This also makes it easy to extend later:

1
2
3
# Adding admin access is trivial
if user.is_admin or session.user_id == user_id:
    # authorized

Separation of concerns:

  • Repository: “How do I fetch this data?” (data access)
  • Service: “Who can access this data?” (authorization)
  • API: “What HTTP status code?” (protocol)

Q: Why import AsyncSession if services don’t use the database?

Sharp eye! You noticed we import from sqlalchemy.ext.asyncio import AsyncSession.

Here’s what’s happening: The service accepts AsyncSession as a dependency and owns transactions, while repositories focus on data access:

1
2
3
4
5
async def create_session(self, db: AsyncSession, ...):
    # Services own transactions; repos never commit
    async with db.begin():
        session = await session_repo.create(db, ...)
        return SessionDTO.from_orm(session, message_count=0)

The service doesn’t write SQL or import ORM models.

The service does:

  • Accept AsyncSession for type hints (helps IDEs and type checkers)
  • Own commit/rollback boundaries via async with db.begin()
  • Delegate queries to repositories (which do add/flush/refresh, not commit)

This is pragmatic dependency injection. We could define an abstract DatabaseInterface protocol, but that adds complexity for minimal benefit—we’re not planning to swap out SQLAlchemy.

Why a singleton instance at the bottom?

1
session_service = SessionService()

This creates a single instance that’s imported everywhere. Services are stateless (they don’t store data between calls), so one instance is enough. This makes mocking easier in tests—you can patch session_service globally.

Building MessageService

Now let’s create the service for message operations. It follows the same pattern as SessionService but focuses on message-specific business logic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# backend/app/domain/services/message_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.repositories.message_repo import message_repo
from app.persistence.repositories.session_repo import session_repo
from app.domain.dtos import MessageDTO, MessageCreateDTO


class MessageService:
    """Business logic for message operations."""

    async def create_message(
        self,
        db: AsyncSession,
        session_id: int,
        user_id: int,
        data: MessageCreateDTO,
    ) -> MessageDTO:
        """Create a new message with authorization check."""
        # Authorization: Verify user owns the session
        session = await session_repo.get_by_id(db, session_id)
        if not session:
            raise ValueError(f"Session {session_id} not found")

        if session.user_id != user_id:
            raise PermissionError("Cannot add messages to another user's session")

        # Create message via repository (service owns transaction)
        async with db.begin():
            message = await message_repo.create(
                db,
                session_id=session_id,
                role=data.role,
                content=data.content,
                tool_name=data.tool_name,
                trace_id=data.trace_id,
                span_id=data.span_id,
                tokens=data.tokens,
            )

            return MessageDTO.from_orm(message)

    async def list_session_messages(
        self,
        db: AsyncSession,
        session_id: int,
        user_id: int,
        skip: int = 0,
        limit: int = 100,
    ) -> list[MessageDTO]:
        """List messages for a session with authorization check."""
        # Authorization: Verify user owns the session
        session = await session_repo.get_by_id(db, session_id)
        if not session:
            raise ValueError(f"Session {session_id} not found")

        if session.user_id != user_id:
            raise PermissionError("Cannot view messages from another user's session")

        # Fetch messages
        messages = await message_repo.list_by_session(db, session_id, skip, limit)

        return [MessageDTO.from_orm(msg) for msg in messages]


message_service = MessageService()

Key differences from SessionService:

  1. Cross-entity authorization: create_message() checks that the user owns the session before allowing message creation. We’re enforcing “you can only add messages to your own sessions.”

  2. Role validation: Messages have a role field (user, assistant, tool_call, system). We validate it’s one of the allowed values.

  3. List comprehension: list_session_messages() uses a list comprehension to convert all messages to DTOs in one line. Same pattern, cleaner syntax.

Both services follow the same principles:

  • Authorization checks (who can do this?)
  • Validation (is the data valid?)
  • Orchestration (coordinate repositories)
  • Return DTOs (not ORM models)

Note

Gotchas we just avoided

  • 403 vs 404: fetch then authorize in services; don’t hide forbidden with “not found”.
  • DTOs decouple DB from API; you can rename DB fields without breaking clients.
  • Handler bloat: keep HTTP code thin; move rules + transactions into services.

The DTO Layer: Why We Don’t Return ORM Models

You’ll notice the service methods return SessionDTO, not the raw Session ORM model. This is crucial. Let me explain why with a concrete example.

The problem with returning ORM models directly:

Imagine you return a SQLAlchemy Session object directly to FastAPI:

1
2
3
@app.get("/sessions/{id}")
async def get_session(id: int, db: AsyncSession):
    return await session_repo.get_by_id(db, id)  # Returns ORM model

This seems fine until:

  1. Accidental data leaks: Your ORM model has internal fields (_sa_instance_state, lazy-loaded relationships). FastAPI tries to serialize everything, potentially exposing data you didn’t intend to send.

  2. Breaking changes: You add a new column to the sessions table. Suddenly your API returns extra data. Clients that parse the response strictly might break.

  3. No validation: The API can’t validate that the data is correct before sending it. If a database migration leaves updated_at NULL, you send NULL to the client.

  4. Circular references: ORM models have relationships. A Session has Messages, each Message has a Session. Try serializing that to JSON and watch it explode with circular reference errors.

  5. Tight coupling: Your API shape is now tied to your database schema. Want to rename user_id to owner_id in the API without a database migration? Can’t do it.

DTOs solve all of this:

DTOs (Data Transfer Objects) are Pydantic models that define the exact shape of your API responses. They’re the contract between your backend and clients.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# backend/app/domain/dtos.py
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field


class SessionDTO(BaseModel):
    """
    Data Transfer Object for Session.
    This is what clients receive from the API.
    """
    id: int
    user_id: int
    title: str
    message_count: int  # Computed field, not in database!
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True  # Allows .from_orm() method

    @classmethod
    def from_orm(cls, session: "Session", message_count: int) -> "SessionDTO":
        """
        Convert ORM model to DTO.
        Notice we pass message_count separately - it's not a database field.
        """
        return cls(
            id=session.id,
            user_id=session.user_id,
            title=session.title,
            message_count=message_count,  # Injected from service layer
            created_at=session.created_at,
            updated_at=session.updated_at,
        )


class SessionCreateDTO(BaseModel):
    """
    DTO for creating a session (request body).
    Only contains fields the client can set.
    """
    title: str = Field(..., min_length=1, max_length=500)

"""
Note: With Pydantic v2, the canonical constructor is
`SessionDTO.model_validate(obj, from_attributes=True)`. We keep a
manual `from_orm(...)` helper for readability and to pass computed
fields like `message_count` from services.
"""


class MessageDTO(BaseModel):
    """Data Transfer Object for Message."""
    id: int
    session_id: int
    role: str
    content: str
    tool_name: str | None = None
    trace_id: str | None = None
    span_id: str | None = None
    tokens: int
    created_at: datetime

    class Config:
        from_attributes = True

    @classmethod
    def from_orm(cls, message: "Message") -> "MessageDTO":
        return cls(
            id=message.id,
            session_id=message.session_id,
            role=message.role,
            content=message.content,
            tool_name=message.tool_name,
            trace_id=message.trace_id,
            span_id=message.span_id,
            tokens=message.tokens,
            created_at=message.created_at,
        )


class MessageCreateDTO(BaseModel):
    """
    DTO for creating a message (request body).
    Uses an Enum for role to ensure type‑safety across layers.
    """
    class MessageRole(str, Enum):
        user = "user"
        assistant = "assistant"
        tool_call = "tool_call"
        system = "system"

    role: MessageRole
    content: str = Field(..., min_length=1)
    tool_name: str | None = Field(default=None, max_length=255)
    trace_id: str | None = Field(default=None, max_length=255)
    span_id: str | None = Field(default=None, max_length=255)
    tokens: int = Field(default=0, ge=0)

What DTOs give us:

  1. Explicit API contract: SessionDTO defines exactly what clients receive. It’s documentation as code.

  2. Computed fields: Notice message_count in SessionDTO. This field doesn’t exist in the database — we compute it in the service layer. DTOs let us enrich responses without changing the database schema.

  3. Validation on input: SessionCreateDTO validates that titles are 1-500 characters. FastAPI automatically returns HTTP 422 with error details if validation fails. No manual checking needed.

  4. Type safety: TypeScript clients can generate types from your OpenAPI spec. SessionDTO becomes a TypeScript interface automatically.

  5. Separation of concerns:

    • ORM models = database schema
    • DTOs = API schema
    • They can evolve independently

Small but important: structural validation (lengths, patterns) belongs in DTOs and returns HTTP 422 from FastAPI automatically; business rules (ownership, quotas, uniqueness by policy) belong in services and should raise ValueError/PermissionError, which handlers map to HTTP 400/403.

Example of independence:

Imagine you rename the sessions table column from title to session_name in the database. You update the ORM model:

1
2
class Session(Base):
    session_name = Column(String, nullable=False)  # renamed

But you keep the DTO the same:

1
2
class SessionDTO(BaseModel):
    title: str  # API still uses "title"

Then you adjust the mapping in from_orm():

1
2
3
4
5
6
7
@classmethod
def from_orm(cls, session: "Session", message_count: int) -> "SessionDTO":
    return cls(
        ...
        title=session.session_name,  # Map database field to API field
        ...
    )

API clients don’t break. They still receive title in the JSON response. You’ve refactored the database without causing a breaking change.

This is the power of the DTO layer — it decouples your API from your database schema.

REST API Endpoints: Bringing It All Together

TLDR

  • Thin handlers: parse → call service → map errors
  • Services return DTOs; no ORM models in responses
  • Verify via Swagger and curl

We’ve built the foundation — repositories for data access, services for business logic, DTOs for API contracts. Now let’s wire it all together with FastAPI endpoints.

The Thin Handler Pattern

API endpoints should be thin. Really thin. Like 5-10 lines thin.

Their only job is to:

  1. Extract data from the HTTP request (path params, query params, request body)
  2. Call a service method
  3. Handle exceptions and convert them to HTTP status codes
  4. Return the response

They should NOT:

  • Contain business logic
  • Make database queries
  • Validate business rules

All of that lives in the service layer. The endpoint is just HTTP plumbing.

Let’s see this pattern in action with our session endpoints:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# backend/app/api/sessions.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import get_session
from app.domain.services.session_service import session_service
from app.domain.dtos import SessionDTO, SessionCreateDTO

router = APIRouter(prefix="/api/sessions", tags=["sessions"])


@router.post("/", response_model=SessionDTO, status_code=status.HTTP_201_CREATED)
async def create_session(
    data: SessionCreateDTO,  # FastAPI auto-validates request body
    db: AsyncSession = Depends(get_session),  # Inject database session
    user_id: int = 1,  # TODO: Get from auth middleware in Part 3
):
    """
    Create a new chat session.

    Returns 201 with the created session on success.
    Returns 400 if validation fails (title empty or too long).
    """
    try:
        return await session_service.create_session(db, user_id, data.title)
    except ValueError as e:
        # Service raised ValueError -> convert to HTTP 400
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))


@router.get("/", response_model=list[SessionDTO])
async def list_sessions(
    skip: int = 0,
    limit: int = 50,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """
    List user's sessions with pagination.

    Query params:
    - skip: Number of sessions to skip (default 0)
    - limit: Max sessions to return (default 50)
    """
    return await session_service.list_user_sessions(db, user_id, skip, limit)


@router.get("/{session_id}", response_model=SessionDTO)
async def get_session(
    session_id: int,  # Path parameter
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """
    Get a specific session.

    Returns 404 if session doesn't exist.
    Returns 403 if session belongs to another user.
    """
    try:
        session = await session_service.get_session(db, session_id, user_id)
        if not session:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
        return session
    except PermissionError as e:
        # Service raised PermissionError -> convert to HTTP 403
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))


@router.delete("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_session(
    session_id: int,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """
    Delete a session.

    Returns 204 (no content) on success.
    Returns 404 if session doesn't exist.
    Returns 403 if session belongs to another user.
    """
    try:
        deleted = await session_service.delete_session(db, session_id, user_id)
        if not deleted:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))

Let’s break down what’s happening in each endpoint:

1. create_session() - The creation flow:

sequenceDiagram
    participant Client
    participant FastAPI
    participant Endpoint
    participant Service
    participant Repository
    participant Database

    Client->>FastAPI: POST /api/sessions<br/>{"title": "My chat"}
    FastAPI->>FastAPI: Validate SessionCreateDTO<br/>(title length 1-500)
    FastAPI->>Endpoint: create_session(data)
    Endpoint->>Service: create_session(db, user_id, title)
    Service->>Service: Validate title<br/>(not empty, not too long)
    Service->>Repository: create(db, user_id, title)
    Repository->>Database: INSERT INTO sessions...
    Database-->>Repository: Session(id=1, ...)
    Repository-->>Service: Session ORM model
    Service->>Service: Convert to SessionDTO
    Service-->>Endpoint: SessionDTO
    Endpoint-->>FastAPI: SessionDTO
    FastAPI->>FastAPI: Serialize to JSON
    FastAPI-->>Client: HTTP 201<br/>{"id": 1, "user_id": 1, "title": "My chat", ...}

Get / Delete flows (at a glance):

sequenceDiagram
    participant Client
    participant API as Endpoint
    participant Service
    participant Repo
    participant DB

    Client->>API: GET /api/sessions/{id}
    API->>Service: get_session(db, id, user)
    Service->>Repo: get_by_id(db, id)
    Repo->>DB: SELECT ... FROM sessions WHERE id = ?
    DB-->>Repo: Session or NULL
    Service-->>API: SessionDTO or None/PermissionError
    API-->>Client: 200/404/403

    Client->>API: DELETE /api/sessions/{id}
    API->>Service: delete_session(db, id, user)
    Service->>Repo: get_by_id(db, id)
    Service->>Repo: delete(db, id) note over Service,Repo: within db.begin()
    API-->>Client: 204/404/403

2. list_sessions() - Query parameters:

FastAPI automatically extracts skip and limit from the URL query string. Request to /api/sessions?skip=10&limit=20 becomes function arguments skip=10, limit=20.

No error handling needed here because the service method doesn’t raise exceptions — it just returns a list (which might be empty).

3. get_session() - Error handling:

The service raises PermissionError if the session belongs to another user. The endpoint catches this and converts it to HTTP 403 Forbidden. This is the beauty of the layered architecture:

  • Service layer: Raises Python exceptions (knows nothing about HTTP)
  • API layer: Catches exceptions, maps to HTTP status codes

4. delete_session() - Response with no body:

Returns HTTP 204 No Content on success. Notice there’s no return statement — FastAPI automatically returns 204 because we specified status_code=status.HTTP_204_NO_CONTENT in the decorator.

Why This Pattern Works

Notice how clean these endpoints are. Each one is 5-15 lines. They’re almost boring — just plumbing.

All the interesting stuff happens in the service layer:

  • Validation logic lives in session_service.create_session()
  • Authorization lives in session_service.get_session()
  • Orchestration lives in session_service.list_user_sessions()

If you want to test the business rule “users can only delete their own sessions,” you don’t write an integration test that starts FastAPI and makes HTTP requests. You write a unit test that calls session_service.delete_session() directly. Much faster.

The hardcoded user_id = 1:

You’ll see this in all endpoints right now. In Part 3, when we add Auth0 authentication, we’ll replace this with a dependency that extracts the user from the JWT token:

1
user_id: int = Depends(get_current_user_id)

For now, we’re hardcoding it so we can build and test the endpoints without authentication blocking us.

Message endpoints follow the exact same pattern — I won’t bore you with detailed explanations again. Notice the simplicity:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# backend/app/api/messages.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import get_session
from app.domain.services.message_service import message_service
from app.domain.dtos import MessageDTO, MessageCreateDTO

router = APIRouter(prefix="/api/messages", tags=["messages"])


@router.post("/", response_model=MessageDTO, status_code=status.HTTP_201_CREATED)
async def create_message(
    data: MessageCreateDTO,
    session_id: int = Query(..., description="Session ID"),
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """Create a new message in a session."""
    try:
        return await message_service.create_message(db, session_id, user_id, data)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))


@router.get("/", response_model=list[MessageDTO])
async def list_messages(
    session_id: int = Query(..., description="Session ID"),
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """List messages for a session."""
    try:
        return await message_service.list_session_messages(db, session_id, user_id, skip, limit)
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))

Same pattern: thin handlers, service calls, exception mapping. Boring is good. Boring is maintainable.

Wiring Everything Together

Finally, we register the routers in our main FastAPI app:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# backend/app/main.py
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.settings import settings
from app.api.dependencies import get_session
from app.api import sessions, messages

app = FastAPI(
    title=settings.api_title,
    version=settings.api_version,
    debug=settings.debug,
)

# CORS middleware - allows frontend to call backend
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,  # ["http://localhost:5173"] in dev
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Register routers
app.include_router(sessions.router)
app.include_router(messages.router)


@app.get("/health")
async def health(db: AsyncSession = Depends(get_session)) -> dict[str, str | bool]:
    """Health check with database connectivity test."""
    try:
        # Test database connection
        await db.execute(text("SELECT 1"))
        database_status = "connected"
    except Exception as e:
        database_status = f"error: {str(e)}"

    return {
        "status": "ok",
        "env": settings.env,
        "debug": settings.is_development,
        "database": database_status,
    }

What’s happening here:

  1. CORS middleware: Allows our React frontend (running on localhost:5173) to make requests to the backend (localhost:8000). Without this, browsers block the requests due to same-origin policy.

  2. Router inclusion: app.include_router() registers all the routes from each router. FastAPI combines them into a single API with automatic OpenAPI docs.

  3. Enhanced health check: Not just “ok” — it tests database connectivity by executing SELECT 1. Returns environment info and database status. This is what monitoring tools (like Kubernetes health probes) will ping to verify the service is actually working, not just running.

Note

Same-origin policy is a browser security feature that blocks web pages from making requests to a different domain than the one that served the web page. CORS (Cross-Origin Resource Sharing) is a mechanism that allows servers to specify who can access their resources. In development, our frontend and backend run on different ports, so we need CORS to allow communication between them.

Testing the API

Our backend is now ready to test! Make sure your services are running:

1
make dev

First, create a test user (remember, we’re hardcoding user_id = 1 in the endpoints for now):

1
2
docker exec -it infra-db-1 psql -U postgres -d agent_stack -c \
  "INSERT INTO users (auth0_id, email, created_at) VALUES ('test-user-1', 'test@example.com', NOW());"

You should see:

1
INSERT 0 1

Now create a session:

1
2
3
curl -X POST "http://localhost:8000/api/sessions/" \
  -H "Content-Type: application/json" \
  -d '{"title": "My first conversation"}'

Response:

1
2
3
4
5
6
7
8
{
  "id": 1,
  "user_id": 1,
  "title": "My first conversation",
  "message_count": 0,
  "created_at": "2025-10-29T12:34:56.789000",
  "updated_at": "2025-10-29T12:34:56.789000"
}

List all sessions:

1
curl "http://localhost:8000/api/sessions/"

Create a message for that session:

1
2
3
curl -X POST "http://localhost:8000/api/messages/?session_id=1" \
  -H "Content-Type: application/json" \
  -d '{"role": "user", "content": "Hello, agent!"}'

Performance & Scale Notes

  • Eager loading: when rendering a full conversation, consider selectinload(Session.messages) to avoid N+1 when fetching recent windows.
  • Pool sizing: keep pool_size/max_overflow configurable; too‑large pools can starve Postgres. Monitor and tune.
  • SSE production: set Cache-Control: no-cache, X-Accel-Buffering: no for Nginx, and bump proxy_read_timeout to keep event streams alive.

Optional: Seed Dev Data

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# backend/app/scripts/seed_dev.py
import asyncio
from app.core.database import SessionLocal
from app.persistence.repositories.user_repo import user_repo
from app.persistence.repositories.session_repo import session_repo
from app.persistence.repositories.message_repo import message_repo


async def main():
    async with SessionLocal() as db:
        async with db.begin():
            user = await user_repo.create(db, auth0_id="seed-user", email="seed@example.com")
            s1 = await session_repo.create(db, user_id=user.id, title="Welcome Chat")
            await message_repo.create(db, session_id=s1.id, role="user", content="Hello!")
            await message_repo.create(db, session_id=s1.id, role="assistant", content="Hey there 👋")


if __name__ == "__main__":
    asyncio.run(main())

Run inside the container:

1
cd infra && docker compose exec backend uv run python -m app.scripts.seed_dev

Or visit http://localhost:8000/docs to see the interactive API documentation that FastAPI generates automatically. You can test all endpoints right in the browser.

Warning

Why create a user manually? We hardcoded user_id = 1 in the endpoints because we don’t have authentication yet. In Part 3, we’ll add Auth0, and users will be created automatically when they sign up. For now, this manual step lets us test the full flow without authentication blocking us.

What We’ve Built

Let’s take stock. We’ve gone from “just a health check” to a fully functional backend with proper separation of concerns.

Database Layer:

  • Async SQLAlchemy engine with connection pooling
  • Three ORM models (User, Session, Message) with relationships
  • Alembic migrations tracking schema changes in git
  • Indexes for fast queries (idx_user_updated, idx_session_created)
  • Cascading deletes (delete user -> delete sessions -> delete messages)

Repository Layer:

  • UserRepository with methods for finding users by ID, email, Auth0 ID
  • SessionRepository for creating, listing, updating, and deleting sessions
  • MessageRepository for storing and retrieving conversation messages
  • Clean interface abstracting database details
  • Mockable for unit testing

Domain Layer:

  • SessionService enforcing business rules (title validation, authorization)
  • MessageService (coming in Part 4 when we integrate agents)
  • DTOs defining stable API contracts (SessionDTO, MessageDTO, etc.)
  • Business logic separated from infrastructure

API Layer:

  • REST endpoints for sessions (POST, GET, DELETE)
  • REST endpoints for messages (POST, GET)
  • Pydantic request/response validation
  • Consistent error handling (400, 403, 404)
  • Interactive OpenAPI docs at /docs

Developer Experience:

  • make migrate to run migrations
  • make migrate-create to generate new migrations
  • Type-safe end-to-end with mypy strict mode
  • Hot-reload on code changes
  • One command to start everything (make dev)

Hot damn!

Tip

Checkpoint: Before moving on, verify everything works:

  1. Run make migrate and check for errors
  2. Visit http://localhost:8000/docs and see all endpoints documented
  3. Create a session via curl or the Swagger UI
  4. Check the database: docker exec -it infra-db-1 psql -U postgres -d agent_stack -c "SELECT * FROM sessions;"
  5. Create a message for that session
  6. List messages and verify they’re returned chronologically

If anything fails, check make logs to debug.

What’s Next

In Part 3, we’re tackling Authentication & Security:

  • Auth0 integration for user signup and login
  • JWT validation middleware protecting endpoints
  • Per-user data isolation (no more hardcoded user_id = 1)
  • Session cookies for SSE streaming (EventSource can’t send auth headers)
  • CORS configuration for production

After that, Part 4 is where things get exciting - we’ll integrate the OpenAI Agents SDK and build the actual agent streaming logic (finally!).


Next: Part 3 - Authentication & Security (Coming Soon)


This is part of a series on building production-ready AI agent applications. All code is open source on GitHub.

Info

Enjoying this series? Star the GitHub repo, share it with your team, or leave a comment below. Questions and suggestions are welcome - open an issue on GitHub or use the “Suggest an Edit” link at the bottom of the page.