Building a Production-Ready Agent Stack: Part 2 - Backend Core & Database
From SQLAlchemy to REST APIs, we build a type-safe persistence layer with clean architecture. Learn the Repository pattern, Alembic migrations, and why async matters for agent applications.
Building a Production-Ready Agent Stack: Part 2 - Backend Core & Database#
Our FastAPI server is running, but it’s just a health check endpoint talking to no one. Time to build the persistence layer—database models, repositories, domain services, and the REST API that will power our agent conversations.
Right now, our backend can tell you it’s alive (GET /health), but it can’t store a conversation or remember a user. Postgres is running in Docker, but we’re not using it. Let’s fix that.
By the end of this post, you’ll have:
SQLAlchemy models for users, sessions, and messages with proper relationships
Alembic migrations versioning your schema changes in git
Repository pattern abstracting database access for testability
Domain services enforcing business logic and authorization
REST endpoints for full CRUD on sessions and messages
We’re building a three-layer architecture—API, Domain, Persistence—to keep concerns separated as complexity grows.
%%{init: {'theme':'base', 'themeVariables': { 'fontSize':'16px'}}}%%
graph TB
Client[Client Request]
API["API Layer<br/><small>app/api/</small><br/><small>HTTP routing, validation</small>"]
Domain["Domain Layer<br/><small>app/domain/</small><br/><small>Business logic</small>"]
Repo["Persistence Layer<br/><small>app/persistence/</small><br/><small>Database access</small>"]
DB[(PostgreSQL)]
Client --> API
API --> Domain
Domain --> Repo
Repo --> DB
style Client fill:#f0f4ff,stroke:#a5b4fc,stroke-width:2px
style API fill:#dbeafe,stroke:#93c5fd,stroke-width:2px
style Domain fill:#d1fae5,stroke:#6ee7b7,stroke-width:2px
style Repo fill:#e9d5ff,stroke:#c084fc,stroke-width:2px
style DB fill:#bfdbfe,stroke:#60a5fa,stroke-width:2px
Domain Layer (app/domain/): Business logic, validation rules, authorization. Pure Python—no FastAPI, no SQLAlchemy. Testable without a server or database.
Persistence Layer (app/persistence/): Database access via ORM models and repositories. Knows Postgres, doesn’t know HTTP.
This separation means you can swap Postgres for MongoDB by rewriting only the persistence layer. Change from REST to GraphQL by rewriting only the API layer. Add a rule like “max 10 sessions per user” by editing only the domain layer. No cascading changes across the codebase.
We’re using SQLAlchemy 2.0 with async support via asyncpg.
Why async matters for agent applications:
Agent apps are I/O-heavy. When a user sends a message, the flow looks like this:
Save message to database (20ms I/O)
Call LLM API and stream response (5-30 seconds I/O)
Save response to database (20ms I/O)
Update usage metrics (20ms I/O)
With synchronous code, each operation blocks the Python event loop. One user’s 10-second LLM call blocks all other users. With 10 concurrent users, the last request waits 100 seconds.
With async, Python yields control during I/O operations and handles other requests. All 10 users get their responses in roughly 10 seconds. The difference in production is dramatic—I’ve seen systems go from 10 concurrent users to 500+ on the same hardware just by switching to async.
# backend/app/core/database.pyfromsqlalchemy.ext.asyncioimport(AsyncSession,create_async_engine,async_sessionmaker,)fromapp.core.settingsimportsettings# Create async engineengine=create_async_engine(settings.database_url,echo=settings.is_development,# Log SQL queries in devpool_size=10,# Max connections in the poolmax_overflow=20,# Extra connections if pool is exhaustedpool_pre_ping=True,# Verify connections before using)# Session factorySessionLocal=async_sessionmaker(engine,class_=AsyncSession,expire_on_commit=False,# Don't expire objects after commit)asyncdefget_db()->AsyncSession:"""Dependency for injecting database sessions into endpoints."""asyncwithSessionLocal()assession:try:yieldsessionfinally:awaitsession.close()
Key configuration:
echo=settings.is_development: Log SQL queries in dev for debugging, silent in production
pool_size=10, max_overflow=20: Reuse connections (creating them is slow—50-100ms). Start with 30 max, adjust based on traffic.
pool_pre_ping=True: Verify connections before use to avoid “connection closed” errors
expire_on_commit=False: Don’t force DB round-trips after commit when we’re about to serialize and return
fromfastapiimportAPIRouter,Dependsfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.api.dependenciesimportget_sessionrouter=APIRouter()@router.get("/health")asyncdefhealth_check(db:AsyncSession=Depends(get_session)):"""Health check with database connectivity test."""try:awaitdb.execute("SELECT 1")return{"status":"healthy","database":"connected"}exceptExceptionase:return{"status":"unhealthy","database":f"error: {str(e)}"}
FastAPI automatically calls get_session() before your endpoint runs, passes the session as the db parameter, and ensures it’s closed after the request completes. No manual session management, no leaked connections.
Alembic: Version Control for Your Database Schema#
Alembic tracks database schema changes like git tracks code. It auto-generates migrations by comparing your ORM models to the database, handles rollbacks, and keeps your team’s databases in sync.
# backend/alembic/env.pyfromlogging.configimportfileConfigfromsqlalchemyimportpoolfromsqlalchemy.engineimportConnectionfromsqlalchemy.ext.asyncioimportasync_engine_from_configfromalembicimportcontext# Import your models' metadatafromapp.persistence.modelsimportBasefromapp.core.settingsimportsettings# Alembic Config objectconfig=context.config# Override database URL from settingsconfig.set_main_option("sqlalchemy.url",settings.database_url)# Setup loggingifconfig.config_file_nameisnotNone:fileConfig(config.config_file_name)# This is your models' MetaData objecttarget_metadata=Base.metadatadefrun_migrations_offline()->None:"""Run migrations in 'offline' mode (generate SQL only)."""url=config.get_main_option("sqlalchemy.url")context.configure(url=url,target_metadata=target_metadata,literal_binds=True,dialect_opts={"paramstyle":"named"},)withcontext.begin_transaction():context.run_migrations()asyncdefrun_migrations_online()->None:"""Run migrations in 'online' mode (connect to database)."""connectable=async_engine_from_config(config.get_section(config.config_ini_section,{}),prefix="sqlalchemy.",poolclass=pool.NullPool,)asyncwithconnectable.connect()asconnection:awaitconnection.run_sync(do_run_migrations)awaitconnectable.dispose()defdo_run_migrations(connection:Connection)->None:context.configure(connection=connection,target_metadata=target_metadata)withcontext.begin_transaction():context.run_migrations()ifcontext.is_offline_mode():run_migrations_offline()else:importasyncioasyncio.run(run_migrations_online())
The key change: run_migrations_online() uses an async engine, matching our async SQLAlchemy setup.
# Makefile (add to existing file)
# Database migrations
migrate: @echo "Running database migrations..."cd backend && uv run alembic upgrade head
migrate-create: @echo "Creating new migration..." @read -p "Migration message: " msg;\
cd backend && uv run alembic revision --autogenerate -m "$$msg"migrate-rollback: @echo "Rolling back last migration..."cd backend && uv run alembic downgrade -1
Your workflow:
Change ORM models
Run make migrate-create (generates migration)
Review the migration in alembic/versions/
Run make migrate (applies it)
If needed, run make migrate-rollback
Warning
Always review auto-generated migrations. Alembic is smart but not perfect—it sees column renames as drop + add, which loses data. Check the upgrade() and downgrade() functions before applying.
# backend/app/persistence/models.pyfromdatetimeimportdatetimefromsqlalchemyimportString,Integer,Text,DateTime,ForeignKey,Indexfromsqlalchemy.ormimportDeclarativeBase,Mapped,mapped_column,relationshipclassBase(DeclarativeBase):"""Base class for all ORM models."""passclassUser(Base):__tablename__="users"id:Mapped[int]=mapped_column(Integer,primary_key=True)auth0_id:Mapped[str]=mapped_column(String(255),unique=True,index=True)email:Mapped[str]=mapped_column(String(255),unique=True,index=True)created_at:Mapped[datetime]=mapped_column(DateTime,default=datetime.utcnow,nullable=False)# Relationship: one user has many sessionssessions:Mapped[list["Session"]]=relationship("Session",back_populates="user",cascade="all, delete-orphan")def__repr__(self)->str:returnf"<User(id={self.id}, email={self.email})>"classSession(Base):__tablename__="sessions"id:Mapped[int]=mapped_column(Integer,primary_key=True)user_id:Mapped[int]=mapped_column(Integer,ForeignKey("users.id",ondelete="CASCADE"),nullable=False)title:Mapped[str]=mapped_column(String(500),nullable=False)created_at:Mapped[datetime]=mapped_column(DateTime,default=datetime.utcnow,nullable=False)updated_at:Mapped[datetime]=mapped_column(DateTime,default=datetime.utcnow,onupdate=datetime.utcnow,nullable=False)# Relationshipsuser:Mapped["User"]=relationship("User",back_populates="sessions")messages:Mapped[list["Message"]]=relationship("Message",back_populates="session",cascade="all, delete-orphan")# Index for fast user session lookups__table_args__=(Index("idx_user_created","user_id","created_at"),)def__repr__(self)->str:returnf"<Session(id={self.id}, title={self.title[:30]})>"classMessage(Base):__tablename__="messages"id:Mapped[int]=mapped_column(Integer,primary_key=True)session_id:Mapped[int]=mapped_column(Integer,ForeignKey("sessions.id",ondelete="CASCADE"),nullable=False)role:Mapped[str]=mapped_column(String(50),nullable=False)# "user" or "assistant"content:Mapped[str]=mapped_column(Text,nullable=False)tokens:Mapped[int]=mapped_column(Integer,default=0)# For usage trackingcreated_at:Mapped[datetime]=mapped_column(DateTime,default=datetime.utcnow,nullable=False)# Relationshipsession:Mapped["Session"]=relationship("Session",back_populates="messages")# Index for fast session message lookups__table_args__=(Index("idx_session_created","session_id","created_at"),)def__repr__(self)->str:returnf"<Message(id={self.id}, role={self.role}, content={self.content[:30]})>"
Key design choices:
auth0_id: Links to Auth0 accounts (no passwords stored locally)
cascade="all, delete-orphan": Delete user → delete their sessions → delete all messages
Index("idx_user_created", ...): Fast queries for “list user’s sessions by date”
tokens on Message: Track usage per message for billing (Part 6)
updated_at on Session: Auto-updates for “recently active” sorting
Note
We’re using SQLAlchemy 2.0’s Mapped[] syntax for better type inference. mypy catches non-existent fields at compile time, not runtime.
make migrate-create
# Message: "add user session message tables"# Review the generated migration in alembic/versions/cat backend/alembic/versions/*_add_user_session_message_tables.py
# Apply itmake migrate
List of relations
Schema | Name | Type | Owner
--------+-----------------+-------+----------
public | alembic_version | table | postgres
public | messages | table | postgres
public | sessions | table | postgres
public | users | table | postgres
We have ORM models, but we don’t want SQLAlchemy queries scattered throughout our codebase. The Repository pattern gives us a clean interface for data access.
The problem with direct ORM usage:
Without repositories, every endpoint that needs a user by email would write:
# backend/app/persistence/repositories/user_repo.pyfromsqlalchemyimportselectfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.persistence.modelsimportUserclassUserRepository:"""Repository for User model operations."""asyncdefcreate(self,db:AsyncSession,auth0_id:str,email:str)->User:"""Create a new user."""user=User(auth0_id=auth0_id,email=email)db.add(user)awaitdb.commit()awaitdb.refresh(user)# Load generated IDreturnuserasyncdefget_by_id(self,db:AsyncSession,user_id:int)->User|None:"""Get user by ID."""result=awaitdb.execute(select(User).where(User.id==user_id))returnresult.scalar_one_or_none()asyncdefget_by_auth0_id(self,db:AsyncSession,auth0_id:str)->User|None:"""Get user by Auth0 ID."""result=awaitdb.execute(select(User).where(User.auth0_id==auth0_id))returnresult.scalar_one_or_none()asyncdefget_by_email(self,db:AsyncSession,email:str)->User|None:"""Get user by email."""result=awaitdb.execute(select(User).where(User.email==email))returnresult.scalar_one_or_none()asyncdeflist_all(self,db:AsyncSession,skip:int=0,limit:int=100)->list[User]:"""List all users with pagination."""result=awaitdb.execute(select(User).offset(skip).limit(limit))returnlist(result.scalars().all())# Singleton instanceuser_repo=UserRepository()
Notice the pattern:
Every method takes db: AsyncSession as the first parameter (dependency injection)
Methods return domain objects (User), not raw database rows
We use scalar_one_or_none() for single results, scalars().all() for lists
Pagination is built-in with offset and limit
The user_repo singleton instance gets imported wherever we need user operations. In tests, we can mock it with a single line.
# backend/app/persistence/repositories/session_repo.pyfromsqlalchemyimportselect,descfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.persistence.modelsimportSessionclassSessionRepository:"""Repository for Session model operations."""asyncdefcreate(self,db:AsyncSession,user_id:int,title:str)->Session:"""Create a new session."""session=Session(user_id=user_id,title=title)db.add(session)awaitdb.commit()awaitdb.refresh(session)returnsessionasyncdefget_by_id(self,db:AsyncSession,session_id:int)->Session|None:"""Get session by ID."""result=awaitdb.execute(select(Session).where(Session.id==session_id))returnresult.scalar_one_or_none()asyncdeflist_by_user(self,db:AsyncSession,user_id:int,skip:int=0,limit:int=50)->list[Session]:"""List sessions for a user, sorted by most recent."""result=awaitdb.execute(select(Session).where(Session.user_id==user_id).order_by(desc(Session.updated_at)).offset(skip).limit(limit))returnlist(result.scalars().all())asyncdefupdate_title(self,db:AsyncSession,session_id:int,title:str)->Session|None:"""Update session title."""session=awaitself.get_by_id(db,session_id)ifsession:session.title=titleawaitdb.commit()awaitdb.refresh(session)returnsessionasyncdefdelete(self,db:AsyncSession,session_id:int)->bool:"""Delete a session."""session=awaitself.get_by_id(db,session_id)ifsession:awaitdb.delete(session)awaitdb.commit()returnTruereturnFalsesession_repo=SessionRepository()
# backend/app/persistence/repositories/message_repo.pyfromsqlalchemyimportselect,descfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.persistence.modelsimportMessageclassMessageRepository:"""Repository for Message model operations."""asyncdefcreate(self,db:AsyncSession,session_id:int,role:str,content:str,tokens:int=0,)->Message:"""Create a new message."""message=Message(session_id=session_id,role=role,content=content,tokens=tokens)db.add(message)awaitdb.commit()awaitdb.refresh(message)returnmessageasyncdefget_by_id(self,db:AsyncSession,message_id:int)->Message|None:"""Get message by ID."""result=awaitdb.execute(select(Message).where(Message.id==message_id))returnresult.scalar_one_or_none()asyncdeflist_by_session(self,db:AsyncSession,session_id:int,skip:int=0,limit:int=100)->list[Message]:"""List messages for a session, sorted chronologically."""result=awaitdb.execute(select(Message).where(Message.session_id==session_id).order_by(Message.created_at)# Chronological order.offset(skip).limit(limit))returnlist(result.scalars().all())asyncdefcount_by_session(self,db:AsyncSession,session_id:int)->int:"""Count messages in a session."""result=awaitdb.execute(select(func.count(Message.id)).where(Message.session_id==session_id))returnresult.scalar()or0message_repo=MessageRepository()
Now instead of writing raw SQLAlchemy queries everywhere, we have a clean interface:
# Get user by Auth0 IDuser=awaituser_repo.get_by_auth0_id(db,"auth0|123")# List user's sessionssessions=awaitsession_repo.list_by_user(db,user_id=user.id)# Get messages for a sessionmessages=awaitmessage_repo.list_by_session(db,session_id=sessions[0].id)
This is the power of repositories—clean, named operations instead of anonymous SQLAlchemy queries scattered everywhere.
Repositories handle data access, but where does business logic live? Not in API handlers (those are just HTTP routing) and not in repositories (those are just queries). We need a domain services layer.
What goes in a domain service?
Validation rules: “Session titles must be non-empty”
Authorization: “Users can only access their own sessions”
Orchestration: “When getting a session, also fetch the message count”
Business policies: “Users can have max 10 active sessions”
Domain services sit between the API and persistence layers. They enforce business rules without knowing about HTTP or database internals. This makes them easy to test—you can verify business logic without starting a server or database.
# backend/app/domain/services/session_service.pyfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.persistence.repositories.session_repoimportsession_repofromapp.persistence.repositories.message_repoimportmessage_repofromapp.domain.dtosimportSessionDTO,SessionCreateDTOclassSessionService:"""Business logic for session operations."""asyncdefcreate_session(self,db:AsyncSession,user_id:int,title:str)->SessionDTO:"""Create a new session with validation."""# Validationifnottitleorlen(title.strip())==0:raiseValueError("Session title cannot be empty")iflen(title)>500:raiseValueError("Session title too long (max 500 characters)")# Create sessionsession=awaitsession_repo.create(db,user_id=user_id,title=title.strip())# Convert to DTOreturnSessionDTO.from_orm(session,message_count=0)asyncdefget_session(self,db:AsyncSession,session_id:int,user_id:int)->SessionDTO|None:"""Get session with ownership check."""session=awaitsession_repo.get_by_id(db,session_id)# Authorization checkifsessionandsession.user_id!=user_id:raisePermissionError("Session does not belong to user")ifnotsession:returnNone# Count messagesmessage_count=awaitmessage_repo.count_by_session(db,session_id)returnSessionDTO.from_orm(session,message_count=message_count)asyncdeflist_user_sessions(self,db:AsyncSession,user_id:int,skip:int=0,limit:int=50)->list[SessionDTO]:"""List sessions for a user."""sessions=awaitsession_repo.list_by_user(db,user_id,skip,limit)# Convert to DTOs with message countsdtos=[]forsessioninsessions:count=awaitmessage_repo.count_by_session(db,session.id)dtos.append(SessionDTO.from_orm(session,message_count=count))returndtosasyncdefdelete_session(self,db:AsyncSession,session_id:int,user_id:int)->bool:"""Delete session with ownership check."""session=awaitsession_repo.get_by_id(db,session_id)ifnotsession:returnFalse# Authorization checkifsession.user_id!=user_id:raisePermissionError("Cannot delete another user's session")returnawaitsession_repo.delete(db,session_id)session_service=SessionService()
Notice how the service layer orchestrates the application:
create_session() validates the title before saving
get_session() checks that the session belongs to the user (authorization)
list_user_sessions() fetches sessions and counts messages for each (orchestration)
delete_session() ensures users can only delete their own data
The service returns DTOs (Data Transfer Objects), not ORM models. This is a crucial separation.
Why DTOs matter:
When an endpoint returns a session, it shouldn’t return the raw SQLAlchemy model. ORM models contain database internals (SQLAlchemy metadata, lazy-loaded relationships, internal state). They’re not designed for serialization.
DTOs are the API contract. They define exactly what data clients receive, with validation and type safety built in:
# backend/app/domain/dtos.pyfromdatetimeimportdatetimefrompydanticimportBaseModel,FieldclassSessionDTO(BaseModel):"""Data Transfer Object for Session."""id:intuser_id:inttitle:strmessage_count:intcreated_at:datetimeupdated_at:datetimeclassConfig:from_attributes=True# Allow from_orm() method@classmethoddeffrom_orm(cls,session:"Session",message_count:int)->"SessionDTO":"""Convert ORM model to DTO."""returncls(id=session.id,user_id=session.user_id,title=session.title,message_count=message_count,created_at=session.created_at,updated_at=session.updated_at,)classSessionCreateDTO(BaseModel):"""DTO for creating a session."""title:str=Field(...,min_length=1,max_length=500)classMessageDTO(BaseModel):"""Data Transfer Object for Message."""id:intsession_id:introle:strcontent:strtokens:intcreated_at:datetimeclassConfig:from_attributes=True@classmethoddeffrom_orm(cls,message:"Message")->"MessageDTO":returncls(id=message.id,session_id=message.session_id,role=message.role,content=message.content,tokens=message.tokens,created_at=message.created_at,)classMessageCreateDTO(BaseModel):"""DTO for creating a message."""role:str=Field(...,pattern="^(user|assistant)$")content:str=Field(...,min_length=1)tokens:int=Field(default=0,ge=0)
Serialization: Converts to JSON automatically with proper datetime formatting
Documentation: Shows up in OpenAPI docs with field descriptions and constraints
Type safety: mypy catches type mismatches at compile time
The separation between ORM models (database) and DTOs (API) might seem redundant, but it’s crucial. ORM models evolve with your database schema. DTOs are your API contract with clients. You can refactor the database without breaking API clients by keeping DTOs stable and mapping between them in services.
Now we wire everything together with FastAPI endpoints. These are thin HTTP handlers—5-10 lines each—that map HTTP requests to service calls and return JSON responses.
The pattern is consistent:
FastAPI validates the request using Pydantic DTOs
The endpoint calls a domain service method
The service returns a DTO (or raises an exception)
FastAPI serializes the DTO to JSON (or maps the exception to an HTTP error)
# backend/app/api/sessions.pyfromfastapiimportAPIRouter,Depends,HTTPException,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.api.dependenciesimportget_sessionfromapp.domain.services.session_serviceimportsession_servicefromapp.domain.dtosimportSessionDTO,SessionCreateDTOrouter=APIRouter(prefix="/api/sessions",tags=["sessions"])@router.post("/",response_model=SessionDTO,status_code=status.HTTP_201_CREATED)asyncdefcreate_session(data:SessionCreateDTO,db:AsyncSession=Depends(get_session),# TODO: Get user_id from authenticated user (Part 3)user_id:int=1,# Hardcoded for now):"""Create a new chat session."""try:returnawaitsession_service.create_session(db,user_id,data.title)exceptValueErrorase:raiseHTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))@router.get("/",response_model=list[SessionDTO])asyncdeflist_sessions(skip:int=0,limit:int=50,db:AsyncSession=Depends(get_session),user_id:int=1,# TODO: From auth):"""List user's sessions."""returnawaitsession_service.list_user_sessions(db,user_id,skip,limit)@router.get("/{session_id}",response_model=SessionDTO)asyncdefget_session(session_id:int,db:AsyncSession=Depends(get_session),user_id:int=1,# TODO: From auth):"""Get a specific session."""try:session=awaitsession_service.get_session(db,session_id,user_id)ifnotsession:raiseHTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="Session not found")returnsessionexceptPermissionErrorase:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail=str(e))@router.delete("/{session_id}",status_code=status.HTTP_204_NO_CONTENT)asyncdefdelete_session(session_id:int,db:AsyncSession=Depends(get_session),user_id:int=1,# TODO: From auth):"""Delete a session."""try:deleted=awaitsession_service.delete_session(db,session_id,user_id)ifnotdeleted:raiseHTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="Session not found")exceptPermissionErrorase:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail=str(e))
Each endpoint is minimal:
create_session(): Validates SessionCreateDTO, calls the service, returns 201 with the created session
list_sessions(): Fetches user’s sessions with pagination, returns a list
get_session(): Fetches one session, returns 404 if not found, 403 if wrong user
delete_session(): Deletes a session, returns 204 (no content) on success
Error handling is consistent—service methods raise Python exceptions (ValueError, PermissionError), and the API layer converts them to appropriate HTTP status codes (400, 403, 404).
Note the hardcoded user_id = 1 for now. In Part 3, we’ll replace this with the authenticated user from Auth0.
# backend/app/api/messages.pyfromfastapiimportAPIRouter,Depends,HTTPException,Query,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromapp.api.dependenciesimportget_sessionfromapp.domain.services.message_serviceimportmessage_servicefromapp.domain.dtosimportMessageDTO,MessageCreateDTOrouter=APIRouter(prefix="/api/messages",tags=["messages"])@router.post("/",response_model=MessageDTO,status_code=status.HTTP_201_CREATED)asyncdefcreate_message(data:MessageCreateDTO,session_id:int=Query(...,description="Session ID"),db:AsyncSession=Depends(get_session),user_id:int=1,# TODO: From auth):"""Create a new message in a session."""try:returnawaitmessage_service.create_message(db,session_id,user_id,data)exceptValueErrorase:raiseHTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))exceptPermissionErrorase:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail=str(e))@router.get("/",response_model=list[MessageDTO])asyncdeflist_messages(session_id:int=Query(...,description="Session ID"),skip:int=0,limit:int=100,db:AsyncSession=Depends(get_session),user_id:int=1,# TODO: From auth):"""List messages for a session."""try:returnawaitmessage_service.list_session_messages(db,session_id,user_id,skip,limit)exceptPermissionErrorase:raiseHTTPException(status_code=status.HTTP_403_FORBIDDEN,detail=str(e))
# backend/app/main.pyfromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewarefromapp.core.settingsimportsettingsfromapp.apiimportsessions,messagesapp=FastAPI(title=settings.api_title,version=settings.api_version,description=settings.api_description,)# CORS middlewareapp.add_middleware(CORSMiddleware,allow_origins=settings.cors_origins,allow_credentials=True,allow_methods=["*"],allow_headers=["*"],)# Include routersapp.include_router(sessions.router)app.include_router(messages.router)@app.get("/health")asyncdefhealth():return{"status":"ok"}
We’re including both routers in the main app. FastAPI will combine all routes and generate a unified OpenAPI schema.
The CORS middleware configuration allows our React frontend (running on port 3000) to make requests to the backend (running on port 8000). In production, you’ll configure this more strictly with specific origins.
# Start servicesmake dev
# Create a sessioncurl -X POST "http://localhost:8000/api/sessions/"\
-H "Content-Type: application/json"\
-d '{"title": "Test Session"}'# Create a messagecurl -X POST "http://localhost:8000/api/messages/?session_id=1"\
-H "Content-Type: application/json"\
-d '{"role": "user", "content": "Hello, agent!"}'# List messagescurl "http://localhost:8000/api/messages/?session_id=1"
Visit http://localhost:8000/docs to see interactive API documentation and test endpoints in the browser.
You have a working backend with proper separation of concerns. Database changes don’t cascade into API handlers. Business logic is testable without spinning up a database. API contracts are versioned and documented.
Tip
Checkpoint: Before moving on, verify everything works:
Run make migrate and check for errors
Visit http://localhost:8000/docs and see all endpoints documented
Create a session via curl or the Swagger UI
Check the database: docker exec -it infra-db-1 psql -U postgres -d agent_stack -c "SELECT * FROM sessions;"
Create a message for that session
List messages and verify they’re returned chronologically
Django is a phenomenal framework for content-heavy websites with admin panels. But for agent APIs, FastAPI is a better fit:
Async-first: Django bolted on async support in 3.1, but most of the ecosystem (middleware, ORM, third-party packages) is still synchronous. FastAPI was built for async from day one.
Performance: FastAPI is 2-3x faster for API workloads because it uses Starlette (async framework) and Pydantic (compiled validation). When you’re streaming agent responses, latency matters.
Automatic docs: Django REST Framework needs manual schema generation. FastAPI generates interactive OpenAPI docs automatically from type hints.
Lightweight: FastAPI is a micro-framework. Django includes admin panels, template engines, and features we don’t need for API-only apps.
Use Django if you need a full-featured admin UI or are building a traditional web app with server-rendered templates. For JSON APIs powering agent applications, FastAPI is cleaner and faster.
Q: Why the Repository pattern?
I’ve built projects both ways. Here’s what happens without repositories:
This gets repeated 10-20 times. When you need to add caching or change the query, you’re hunting through dozens of files. Testing is painful—you can’t easily mock db.execute().
Tests mock one object (user_repo) instead of database internals
Queries live in one place making changes easy
Caching logic can be added to the repository without touching endpoints
Database swaps (Postgres to MongoDB) only touch the repository layer
The trade-off is more files and an extra layer of indirection. For small projects (< 5 models), you can skip repositories. For production agent systems where complexity grows, repositories pay off quickly.
Q: Why async?
For traditional web apps with quick database queries (< 50ms), synchronous code is fine. For agent applications, async is critical.
While waiting for the LLM, Python handles other requests. All 10 users complete in roughly 5 seconds.
The real-world impact is massive. I’ve seen production agent systems go from handling 10 concurrent users (sync) to 500+ concurrent users (async) on the same hardware. For interactive agent UIs where users expect instant feedback, this is the difference between usable and unusable.
Q: Why DTOs?
ORM models contain database internals that shouldn’t leak to your API:
Lazy loading: Accessing a relationship triggers a database query. If you serialize after closing the session, you get errors.
Circular references: User → Sessions → Messages → Session creates infinite loops when serializing to JSON.
Internal fields: SQLAlchemy adds fields like _sa_instance_state that shouldn’t be in API responses.
Schema coupling: Changing database structure breaks API clients.
DTOs solve this:
Explicit fields (only what clients need)
No database dependencies (serialize anytime)
Stable API contracts (database changes don’t break clients)
Type-safe validation (Pydantic catches errors at request time)
The trade-off is more classes to maintain. But for production APIs where stability matters, the separation is worth it.
Next: Part 3 - Authentication & Security (Coming Soon)
This is part of a series on building production-ready AI agent applications. All code is open source on GitHub.
Info
Enjoying this series? Star the GitHub repo, share it with your team, or leave a comment below. Questions and suggestions are welcome—open an issue on GitHub or use the “Suggest an Edit” link at the bottom of the page.
Comments
Comments