Github Badge Co-authored with GPT-5-Pro Co-authored with Claude-4.5-Sonnet

Building a Production-Ready Agent Stack: Part 2 - Backend Core & Database

Our FastAPI server is running, but it’s just a health check endpoint talking to no one. Time to build the persistence layer—database models, repositories, domain services, and the REST API that will power our agent conversations.


Right now, our backend can tell you it’s alive (GET /health), but it can’t store a conversation or remember a user. Postgres is running in Docker, but we’re not using it. Let’s fix that.

By the end of this post, you’ll have:

  • SQLAlchemy models for users, sessions, and messages with proper relationships
  • Alembic migrations versioning your schema changes in git
  • Repository pattern abstracting database access for testability
  • Domain services enforcing business logic and authorization
  • REST endpoints for full CRUD on sessions and messages

We’re building a three-layer architecture—API, Domain, Persistence—to keep concerns separated as complexity grows.

%%{init: {'theme':'base', 'themeVariables': { 'fontSize':'16px'}}}%%
graph TB
    Client[Client Request]
    API["API Layer<br/><small>app/api/</small><br/><small>HTTP routing, validation</small>"]
    Domain["Domain Layer<br/><small>app/domain/</small><br/><small>Business logic</small>"]
    Repo["Persistence Layer<br/><small>app/persistence/</small><br/><small>Database access</small>"]
    DB[(PostgreSQL)]

    Client --> API
    API --> Domain
    Domain --> Repo
    Repo --> DB

    style Client fill:#f0f4ff,stroke:#a5b4fc,stroke-width:2px
    style API fill:#dbeafe,stroke:#93c5fd,stroke-width:2px
    style Domain fill:#d1fae5,stroke:#6ee7b7,stroke-width:2px
    style Repo fill:#e9d5ff,stroke:#c084fc,stroke-width:2px
    style DB fill:#bfdbfe,stroke:#60a5fa,stroke-width:2px

The layers:

  1. API Layer (app/api/): HTTP routing, request validation, response serialization. Knows FastAPI, doesn’t know Postgres.

  2. Domain Layer (app/domain/): Business logic, validation rules, authorization. Pure Python—no FastAPI, no SQLAlchemy. Testable without a server or database.

  3. Persistence Layer (app/persistence/): Database access via ORM models and repositories. Knows Postgres, doesn’t know HTTP.

This separation means you can swap Postgres for MongoDB by rewriting only the persistence layer. Change from REST to GraphQL by rewriting only the API layer. Add a rule like “max 10 sessions per user” by editing only the domain layer. No cascading changes across the codebase.

Setting Up SQLAlchemy with Async Support

We’re using SQLAlchemy 2.0 with async support via asyncpg.

Why async matters for agent applications:

Agent apps are I/O-heavy. When a user sends a message, the flow looks like this:

  1. Save message to database (20ms I/O)
  2. Call LLM API and stream response (5-30 seconds I/O)
  3. Save response to database (20ms I/O)
  4. Update usage metrics (20ms I/O)

With synchronous code, each operation blocks the Python event loop. One user’s 10-second LLM call blocks all other users. With 10 concurrent users, the last request waits 100 seconds.

With async, Python yields control during I/O operations and handles other requests. All 10 users get their responses in roughly 10 seconds. The difference in production is dramatic—I’ve seen systems go from 10 concurrent users to 500+ on the same hardware just by switching to async.

Create the database configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# backend/app/core/database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker,
)
from app.core.settings import settings


# Create async engine
engine = create_async_engine(
    settings.database_url,
    echo=settings.is_development,  # Log SQL queries in dev
    pool_size=10,  # Max connections in the pool
    max_overflow=20,  # Extra connections if pool is exhausted
    pool_pre_ping=True,  # Verify connections before using
)

# Session factory
SessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Don't expire objects after commit
)


async def get_db() -> AsyncSession:
    """Dependency for injecting database sessions into endpoints."""
    async with SessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

Key configuration:

  • echo=settings.is_development: Log SQL queries in dev for debugging, silent in production
  • pool_size=10, max_overflow=20: Reuse connections (creating them is slow—50-100ms). Start with 30 max, adjust based on traffic.
  • pool_pre_ping=True: Verify connections before use to avoid “connection closed” errors
  • expire_on_commit=False: Don’t force DB round-trips after commit when we’re about to serialize and return

Now set up dependency injection:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# backend/app/api/dependencies.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db


async def get_session() -> AsyncSession:
    """FastAPI dependency for injecting database sessions."""
    async with get_db() as session:
        yield session

Now in any endpoint:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import get_session

router = APIRouter()


@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_session)):
    """Health check with database connectivity test."""
    try:
        await db.execute("SELECT 1")
        return {"status": "healthy", "database": "connected"}
    except Exception as e:
        return {"status": "unhealthy", "database": f"error: {str(e)}"}

FastAPI automatically calls get_session() before your endpoint runs, passes the session as the db parameter, and ensures it’s closed after the request completes. No manual session management, no leaked connections.

Alembic: Version Control for Your Database Schema

Alembic tracks database schema changes like git tracks code. It auto-generates migrations by comparing your ORM models to the database, handles rollbacks, and keeps your team’s databases in sync.

Set up Alembic:

1
2
cd backend
uv run alembic init alembic

Configure it for async in alembic/env.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# backend/alembic/env.py
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# Import your models' metadata
from app.persistence.models import Base
from app.core.settings import settings

# Alembic Config object
config = context.config

# Override database URL from settings
config.set_main_option("sqlalchemy.url", settings.database_url)

# Setup logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# This is your models' MetaData object
target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode (generate SQL only)."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online() -> None:
    """Run migrations in 'online' mode (connect to database)."""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    import asyncio
    asyncio.run(run_migrations_online())

The key change: run_migrations_online() uses an async engine, matching our async SQLAlchemy setup.

Add commands to the Makefile:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Makefile (add to existing file)

# Database migrations
migrate:
	@echo "Running database migrations..."
	cd backend && uv run alembic upgrade head

migrate-create:
	@echo "Creating new migration..."
	@read -p "Migration message: " msg; \
	cd backend && uv run alembic revision --autogenerate -m "$$msg"

migrate-rollback:
	@echo "Rolling back last migration..."
	cd backend && uv run alembic downgrade -1

Your workflow:

  1. Change ORM models
  2. Run make migrate-create (generates migration)
  3. Review the migration in alembic/versions/
  4. Run make migrate (applies it)
  5. If needed, run make migrate-rollback

Warning

Always review auto-generated migrations. Alembic is smart but not perfect—it sees column renames as drop + add, which loses data. Check the upgrade() and downgrade() functions before applying.

Building the Domain Models

We need three models:

  • User: Authenticated user (linked to Auth0 in Part 3)
  • Session: A conversation thread
  • Message: Individual user and assistant messages

This is the minimal structure for stateful agent apps: Users own Sessions, Sessions contain Messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# backend/app/persistence/models.py
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, Index
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    """Base class for all ORM models."""
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    auth0_id: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, nullable=False
    )

    # Relationship: one user has many sessions
    sessions: Mapped[list["Session"]] = relationship(
        "Session", back_populates="user", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, email={self.email})>"


class Session(Base):
    __tablename__ = "sessions"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    user_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
    )
    title: Mapped[str] = mapped_column(String(500), nullable=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
    )

    # Relationships
    user: Mapped["User"] = relationship("User", back_populates="sessions")
    messages: Mapped[list["Message"]] = relationship(
        "Message", back_populates="session", cascade="all, delete-orphan"
    )

    # Index for fast user session lookups
    __table_args__ = (Index("idx_user_created", "user_id", "created_at"),)

    def __repr__(self) -> str:
        return f"<Session(id={self.id}, title={self.title[:30]})>"


class Message(Base):
    __tablename__ = "messages"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    session_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("sessions.id", ondelete="CASCADE"), nullable=False
    )
    role: Mapped[str] = mapped_column(String(50), nullable=False)  # "user" or "assistant"
    content: Mapped[str] = mapped_column(Text, nullable=False)
    tokens: Mapped[int] = mapped_column(Integer, default=0)  # For usage tracking
    created_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, nullable=False
    )

    # Relationship
    session: Mapped["Session"] = relationship("Session", back_populates="messages")

    # Index for fast session message lookups
    __table_args__ = (Index("idx_session_created", "session_id", "created_at"),)

    def __repr__(self) -> str:
        return f"<Message(id={self.id}, role={self.role}, content={self.content[:30]})>"

Key design choices:

  • auth0_id: Links to Auth0 accounts (no passwords stored locally)
  • cascade="all, delete-orphan": Delete user → delete their sessions → delete all messages
  • Index("idx_user_created", ...): Fast queries for “list user’s sessions by date”
  • tokens on Message: Track usage per message for billing (Part 6)
  • updated_at on Session: Auto-updates for “recently active” sorting

Note

We’re using SQLAlchemy 2.0’s Mapped[] syntax for better type inference. mypy catches non-existent fields at compile time, not runtime.

Generate and apply the migration:

1
2
3
4
5
6
7
8
make migrate-create
# Message: "add user session message tables"

# Review the generated migration in alembic/versions/
cat backend/alembic/versions/*_add_user_session_message_tables.py

# Apply it
make migrate

Check that the tables exist:

1
docker exec -it infra-db-1 psql -U postgres -d agent_stack -c "\dt"

You should see:

1
2
3
4
5
6
7
           List of relations
 Schema |      Name       | Type  |  Owner
--------+-----------------+-------+----------
 public | alembic_version | table | postgres
 public | messages        | table | postgres
 public | sessions        | table | postgres
 public | users           | table | postgres

The Repository Pattern: Clean Data Access

We have ORM models, but we don’t want SQLAlchemy queries scattered throughout our codebase. The Repository pattern gives us a clean interface for data access.

The problem with direct ORM usage:

Without repositories, every endpoint that needs a user by email would write:

1
2
result = await db.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()

This gets repeated 10-20 times across your codebase. When you need to add caching or change the query, you’re hunting through dozens of files.

The repository solution:

A repository encapsulates all database operations for a specific model. Instead of scattering queries everywhere, you write:

1
user = await user_repo.get_by_email(db, email)

The benefits stack up quickly:

  • Testability: Mock user_repo in unit tests without touching the database
  • Consistency: All User queries live in one place
  • Maintainability: Change the query logic once, not in 20 files
  • Swappable: Move from Postgres to MongoDB by rewriting just the repository

Let’s build the UserRepository:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# backend/app/persistence/repositories/user_repo.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import User


class UserRepository:
    """Repository for User model operations."""

    async def create(self, db: AsyncSession, auth0_id: str, email: str) -> User:
        """Create a new user."""
        user = User(auth0_id=auth0_id, email=email)
        db.add(user)
        await db.commit()
        await db.refresh(user)  # Load generated ID
        return user

    async def get_by_id(self, db: AsyncSession, user_id: int) -> User | None:
        """Get user by ID."""
        result = await db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

    async def get_by_auth0_id(self, db: AsyncSession, auth0_id: str) -> User | None:
        """Get user by Auth0 ID."""
        result = await db.execute(select(User).where(User.auth0_id == auth0_id))
        return result.scalar_one_or_none()

    async def get_by_email(self, db: AsyncSession, email: str) -> User | None:
        """Get user by email."""
        result = await db.execute(select(User).where(User.email == email))
        return result.scalar_one_or_none()

    async def list_all(self, db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
        """List all users with pagination."""
        result = await db.execute(select(User).offset(skip).limit(limit))
        return list(result.scalars().all())


# Singleton instance
user_repo = UserRepository()

Notice the pattern:

  • Every method takes db: AsyncSession as the first parameter (dependency injection)
  • Methods return domain objects (User), not raw database rows
  • We use scalar_one_or_none() for single results, scalars().all() for lists
  • Pagination is built-in with offset and limit

The user_repo singleton instance gets imported wherever we need user operations. In tests, we can mock it with a single line.

SessionRepository follows the same pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# backend/app/persistence/repositories/session_repo.py
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import Session


class SessionRepository:
    """Repository for Session model operations."""

    async def create(self, db: AsyncSession, user_id: int, title: str) -> Session:
        """Create a new session."""
        session = Session(user_id=user_id, title=title)
        db.add(session)
        await db.commit()
        await db.refresh(session)
        return session

    async def get_by_id(self, db: AsyncSession, session_id: int) -> Session | None:
        """Get session by ID."""
        result = await db.execute(select(Session).where(Session.id == session_id))
        return result.scalar_one_or_none()

    async def list_by_user(
        self, db: AsyncSession, user_id: int, skip: int = 0, limit: int = 50
    ) -> list[Session]:
        """List sessions for a user, sorted by most recent."""
        result = await db.execute(
            select(Session)
            .where(Session.user_id == user_id)
            .order_by(desc(Session.updated_at))
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def update_title(self, db: AsyncSession, session_id: int, title: str) -> Session | None:
        """Update session title."""
        session = await self.get_by_id(db, session_id)
        if session:
            session.title = title
            await db.commit()
            await db.refresh(session)
        return session

    async def delete(self, db: AsyncSession, session_id: int) -> bool:
        """Delete a session."""
        session = await self.get_by_id(db, session_id)
        if session:
            await db.delete(session)
            await db.commit()
            return True
        return False


session_repo = SessionRepository()

MessageRepository:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# backend/app/persistence/repositories/message_repo.py
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.models import Message


class MessageRepository:
    """Repository for Message model operations."""

    async def create(
        self,
        db: AsyncSession,
        session_id: int,
        role: str,
        content: str,
        tokens: int = 0,
    ) -> Message:
        """Create a new message."""
        message = Message(session_id=session_id, role=role, content=content, tokens=tokens)
        db.add(message)
        await db.commit()
        await db.refresh(message)
        return message

    async def get_by_id(self, db: AsyncSession, message_id: int) -> Message | None:
        """Get message by ID."""
        result = await db.execute(select(Message).where(Message.id == message_id))
        return result.scalar_one_or_none()

    async def list_by_session(
        self, db: AsyncSession, session_id: int, skip: int = 0, limit: int = 100
    ) -> list[Message]:
        """List messages for a session, sorted chronologically."""
        result = await db.execute(
            select(Message)
            .where(Message.session_id == session_id)
            .order_by(Message.created_at)  # Chronological order
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def count_by_session(self, db: AsyncSession, session_id: int) -> int:
        """Count messages in a session."""
        result = await db.execute(
            select(func.count(Message.id)).where(Message.session_id == session_id)
        )
        return result.scalar() or 0


message_repo = MessageRepository()

Now instead of writing raw SQLAlchemy queries everywhere, we have a clean interface:

1
2
3
4
5
6
7
8
# Get user by Auth0 ID
user = await user_repo.get_by_auth0_id(db, "auth0|123")

# List user's sessions
sessions = await session_repo.list_by_user(db, user_id=user.id)

# Get messages for a session
messages = await message_repo.list_by_session(db, session_id=sessions[0].id)

This is the power of repositories—clean, named operations instead of anonymous SQLAlchemy queries scattered everywhere.

Domain Services: Business Logic Layer

Repositories handle data access, but where does business logic live? Not in API handlers (those are just HTTP routing) and not in repositories (those are just queries). We need a domain services layer.

What goes in a domain service?

  • Validation rules: “Session titles must be non-empty”
  • Authorization: “Users can only access their own sessions”
  • Orchestration: “When getting a session, also fetch the message count”
  • Business policies: “Users can have max 10 active sessions”

Domain services sit between the API and persistence layers. They enforce business rules without knowing about HTTP or database internals. This makes them easy to test—you can verify business logic without starting a server or database.

Let’s build SessionService:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# backend/app/domain/services/session_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.persistence.repositories.session_repo import session_repo
from app.persistence.repositories.message_repo import message_repo
from app.domain.dtos import SessionDTO, SessionCreateDTO


class SessionService:
    """Business logic for session operations."""

    async def create_session(self, db: AsyncSession, user_id: int, title: str) -> SessionDTO:
        """Create a new session with validation."""
        # Validation
        if not title or len(title.strip()) == 0:
            raise ValueError("Session title cannot be empty")

        if len(title) > 500:
            raise ValueError("Session title too long (max 500 characters)")

        # Create session
        session = await session_repo.create(db, user_id=user_id, title=title.strip())

        # Convert to DTO
        return SessionDTO.from_orm(session, message_count=0)

    async def get_session(self, db: AsyncSession, session_id: int, user_id: int) -> SessionDTO | None:
        """Get session with ownership check."""
        session = await session_repo.get_by_id(db, session_id)

        # Authorization check
        if session and session.user_id != user_id:
            raise PermissionError("Session does not belong to user")

        if not session:
            return None

        # Count messages
        message_count = await message_repo.count_by_session(db, session_id)

        return SessionDTO.from_orm(session, message_count=message_count)

    async def list_user_sessions(
        self, db: AsyncSession, user_id: int, skip: int = 0, limit: int = 50
    ) -> list[SessionDTO]:
        """List sessions for a user."""
        sessions = await session_repo.list_by_user(db, user_id, skip, limit)

        # Convert to DTOs with message counts
        dtos = []
        for session in sessions:
            count = await message_repo.count_by_session(db, session.id)
            dtos.append(SessionDTO.from_orm(session, message_count=count))

        return dtos

    async def delete_session(self, db: AsyncSession, session_id: int, user_id: int) -> bool:
        """Delete session with ownership check."""
        session = await session_repo.get_by_id(db, session_id)

        if not session:
            return False

        # Authorization check
        if session.user_id != user_id:
            raise PermissionError("Cannot delete another user's session")

        return await session_repo.delete(db, session_id)


session_service = SessionService()

Notice how the service layer orchestrates the application:

  • create_session() validates the title before saving
  • get_session() checks that the session belongs to the user (authorization)
  • list_user_sessions() fetches sessions and counts messages for each (orchestration)
  • delete_session() ensures users can only delete their own data

The service returns DTOs (Data Transfer Objects), not ORM models. This is a crucial separation.

Why DTOs matter:

When an endpoint returns a session, it shouldn’t return the raw SQLAlchemy model. ORM models contain database internals (SQLAlchemy metadata, lazy-loaded relationships, internal state). They’re not designed for serialization.

DTOs are the API contract. They define exactly what data clients receive, with validation and type safety built in:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# backend/app/domain/dtos.py
from datetime import datetime
from pydantic import BaseModel, Field


class SessionDTO(BaseModel):
    """Data Transfer Object for Session."""
    id: int
    user_id: int
    title: str
    message_count: int
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True  # Allow from_orm() method

    @classmethod
    def from_orm(cls, session: "Session", message_count: int) -> "SessionDTO":
        """Convert ORM model to DTO."""
        return cls(
            id=session.id,
            user_id=session.user_id,
            title=session.title,
            message_count=message_count,
            created_at=session.created_at,
            updated_at=session.updated_at,
        )


class SessionCreateDTO(BaseModel):
    """DTO for creating a session."""
    title: str = Field(..., min_length=1, max_length=500)


class MessageDTO(BaseModel):
    """Data Transfer Object for Message."""
    id: int
    session_id: int
    role: str
    content: str
    tokens: int
    created_at: datetime

    class Config:
        from_attributes = True

    @classmethod
    def from_orm(cls, message: "Message") -> "MessageDTO":
        return cls(
            id=message.id,
            session_id=message.session_id,
            role=message.role,
            content=message.content,
            tokens=message.tokens,
            created_at=message.created_at,
        )


class MessageCreateDTO(BaseModel):
    """DTO for creating a message."""
    role: str = Field(..., pattern="^(user|assistant)$")
    content: str = Field(..., min_length=1)
    tokens: int = Field(default=0, ge=0)

Pydantic models give us several benefits:

  • Validation: Field(min_length=1) ensures content isn’t empty, automatically returning 422 if violated
  • Serialization: Converts to JSON automatically with proper datetime formatting
  • Documentation: Shows up in OpenAPI docs with field descriptions and constraints
  • Type safety: mypy catches type mismatches at compile time

The separation between ORM models (database) and DTOs (API) might seem redundant, but it’s crucial. ORM models evolve with your database schema. DTOs are your API contract with clients. You can refactor the database without breaking API clients by keeping DTOs stable and mapping between them in services.

REST API Endpoints: Bringing It All Together

Now we wire everything together with FastAPI endpoints. These are thin HTTP handlers—5-10 lines each—that map HTTP requests to service calls and return JSON responses.

The pattern is consistent:

  1. FastAPI validates the request using Pydantic DTOs
  2. The endpoint calls a domain service method
  3. The service returns a DTO (or raises an exception)
  4. FastAPI serializes the DTO to JSON (or maps the exception to an HTTP error)

Session endpoints:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# backend/app/api/sessions.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import get_session
from app.domain.services.session_service import session_service
from app.domain.dtos import SessionDTO, SessionCreateDTO

router = APIRouter(prefix="/api/sessions", tags=["sessions"])


@router.post("/", response_model=SessionDTO, status_code=status.HTTP_201_CREATED)
async def create_session(
    data: SessionCreateDTO,
    db: AsyncSession = Depends(get_session),
    # TODO: Get user_id from authenticated user (Part 3)
    user_id: int = 1,  # Hardcoded for now
):
    """Create a new chat session."""
    try:
        return await session_service.create_session(db, user_id, data.title)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))


@router.get("/", response_model=list[SessionDTO])
async def list_sessions(
    skip: int = 0,
    limit: int = 50,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """List user's sessions."""
    return await session_service.list_user_sessions(db, user_id, skip, limit)


@router.get("/{session_id}", response_model=SessionDTO)
async def get_session(
    session_id: int,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """Get a specific session."""
    try:
        session = await session_service.get_session(db, session_id, user_id)
        if not session:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
        return session
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))


@router.delete("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_session(
    session_id: int,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """Delete a session."""
    try:
        deleted = await session_service.delete_session(db, session_id, user_id)
        if not deleted:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))

Each endpoint is minimal:

  • create_session(): Validates SessionCreateDTO, calls the service, returns 201 with the created session
  • list_sessions(): Fetches user’s sessions with pagination, returns a list
  • get_session(): Fetches one session, returns 404 if not found, 403 if wrong user
  • delete_session(): Deletes a session, returns 204 (no content) on success

Error handling is consistent—service methods raise Python exceptions (ValueError, PermissionError), and the API layer converts them to appropriate HTTP status codes (400, 403, 404).

Note the hardcoded user_id = 1 for now. In Part 3, we’ll replace this with the authenticated user from Auth0.

Message endpoints follow the same pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# backend/app/api/messages.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.dependencies import get_session
from app.domain.services.message_service import message_service
from app.domain.dtos import MessageDTO, MessageCreateDTO

router = APIRouter(prefix="/api/messages", tags=["messages"])


@router.post("/", response_model=MessageDTO, status_code=status.HTTP_201_CREATED)
async def create_message(
    data: MessageCreateDTO,
    session_id: int = Query(..., description="Session ID"),
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """Create a new message in a session."""
    try:
        return await message_service.create_message(db, session_id, user_id, data)
    except ValueError as e:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))


@router.get("/", response_model=list[MessageDTO])
async def list_messages(
    session_id: int = Query(..., description="Session ID"),
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_session),
    user_id: int = 1,  # TODO: From auth
):
    """List messages for a session."""
    try:
        return await message_service.list_session_messages(db, session_id, user_id, skip, limit)
    except PermissionError as e:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))

Wire routes into the main app:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# backend/app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.settings import settings
from app.api import sessions, messages

app = FastAPI(
    title=settings.api_title,
    version=settings.api_version,
    description=settings.api_description,
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Include routers
app.include_router(sessions.router)
app.include_router(messages.router)


@app.get("/health")
async def health():
    return {"status": "ok"}

We’re including both routers in the main app. FastAPI will combine all routes and generate a unified OpenAPI schema.

The CORS middleware configuration allows our React frontend (running on port 3000) to make requests to the backend (running on port 8000). In production, you’ll configure this more strictly with specific origins.

Restart the backend and test it:

1
2
3
curl -X POST "http://localhost:8000/api/sessions/" \
  -H "Content-Type: application/json" \
  -d '{"title": "My first conversation"}'

Response:

1
2
3
4
5
6
7
8
{
  "id": 1,
  "user_id": 1,
  "title": "My first conversation",
  "message_count": 0,
  "created_at": "2025-10-29T12:34:56",
  "updated_at": "2025-10-29T12:34:56"
}

What We’ve Built

Let’s take stock. We’ve gone from “just a health check” to a fully functional backend with proper separation of concerns.

Database Layer:

  • Async SQLAlchemy engine with connection pooling
  • Three ORM models (User, Session, Message) with relationships
  • Alembic migrations tracking schema changes in git
  • Indexes for fast queries (idx_user_created, idx_session_created)
  • Cascading deletes (delete user → delete sessions → delete messages)

Repository Layer:

  • UserRepository with methods for finding users by ID, email, Auth0 ID
  • SessionRepository for creating, listing, updating, and deleting sessions
  • MessageRepository for storing and retrieving conversation messages
  • Clean interface abstracting database details
  • Mockable for unit testing

Domain Layer:

  • SessionService enforcing business rules (title validation, authorization)
  • MessageService (coming in Part 4 when we integrate agents)
  • DTOs defining stable API contracts (SessionDTO, MessageDTO, etc.)
  • Business logic separated from infrastructure

API Layer:

  • REST endpoints for sessions (POST, GET, DELETE)
  • REST endpoints for messages (POST, GET)
  • Pydantic request/response validation
  • Consistent error handling (400, 403, 404)
  • Interactive OpenAPI docs at /docs

Developer Experience:

  • make migrate to run migrations
  • make migrate-create to generate new migrations
  • Type-safe end-to-end with mypy strict mode
  • Hot-reload on code changes
  • One command to start everything (make dev)

Try the full flow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Start services
make dev

# Create a session
curl -X POST "http://localhost:8000/api/sessions/" \
  -H "Content-Type: application/json" \
  -d '{"title": "Test Session"}'

# Create a message
curl -X POST "http://localhost:8000/api/messages/?session_id=1" \
  -H "Content-Type: application/json" \
  -d '{"role": "user", "content": "Hello, agent!"}'

# List messages
curl "http://localhost:8000/api/messages/?session_id=1"

Visit http://localhost:8000/docs to see interactive API documentation and test endpoints in the browser.

You have a working backend with proper separation of concerns. Database changes don’t cascade into API handlers. Business logic is testable without spinning up a database. API contracts are versioned and documented.

Tip

Checkpoint: Before moving on, verify everything works:

  1. Run make migrate and check for errors
  2. Visit http://localhost:8000/docs and see all endpoints documented
  3. Create a session via curl or the Swagger UI
  4. Check the database: docker exec -it infra-db-1 psql -U postgres -d agent_stack -c "SELECT * FROM sessions;"
  5. Create a message for that session
  6. List messages and verify they’re returned chronologically

If anything fails, check make logs to debug.

What’s Next

In Part 3, we’re tackling Authentication & Security:

  • Auth0 integration for user signup and login
  • JWT validation middleware protecting endpoints
  • Per-user data isolation (no more hardcoded user_id = 1)
  • Session cookies for SSE streaming (EventSource can’t send auth headers)
  • CORS configuration for production

After that, Part 4 is where things get exciting—we’ll integrate the OpenAI Agents SDK and build the actual agent streaming logic.


Next: Part 3 - Authentication & Security (Coming Soon)


This is part of a series on building production-ready AI agent applications. All code is open source on GitHub.

Info

Enjoying this series? Star the GitHub repo, share it with your team, or leave a comment below. Questions and suggestions are welcome—open an issue on GitHub or use the “Suggest an Edit” link at the bottom of the page.