SQLAlchemy
Complete reference for Python SQL toolkit and ORM.
Overview
SQLAlchemy provides a full suite of well-known enterprise-level persistence patterns, designed for efficient and high-performing database access.
Key Features:
- Core: SQL expression language for building queries
- ORM: Object-relational mapping with declarative models
- Async: Full async/await support (SQLAlchemy 2.0+)
- Migrations: Alembic integration for schema changes
- Multiple DBs: PostgreSQL, MySQL, SQLite, Oracle, SQL Server
Installation
pip install sqlalchemy
# With async support
pip install sqlalchemy[asyncio]
# With PostgreSQL async driver
pip install sqlalchemy[asyncio] asyncpg
# With Alembic migrations
pip install alembic
# With specific database drivers
pip install psycopg2-binary # PostgreSQL
pip install pymysql # MySQL
pip install aiosqlite # SQLite async
SQLAlchemy 2.0
This skill covers SQLAlchemy 2.0+ syntax which is the current standard:
# SQLAlchemy 2.0 style (recommended)
from sqlalchemy import select
from sqlalchemy.orm import Session
stmt = select(User).where(User.name == "john")
result = session.execute(stmt)
users = result.scalars().all()
Engine and Connection
Creating Engine
from sqlalchemy import create_engine
# SQLite
engine = create_engine("sqlite:///database.db")
# PostgreSQL
engine = create_engine("postgresql://user:password@localhost:5432/mydb")
# PostgreSQL with psycopg2
engine = create_engine("postgresql+psycopg2://user:password@localhost/mydb")
# MySQL
engine = create_engine("mysql+pymysql://user:password@localhost:3306/mydb")
# Connection pool settings
engine = create_engine(
"postgresql://user:password@localhost/mydb",
pool_size=10, # Number of connections to keep
max_overflow=20, # Additional connections allowed
pool_timeout=30, # Seconds to wait for connection
pool_recycle=3600, # Recycle connections after 1 hour
echo=True, # Log SQL statements
echo_pool=True, # Log connection pool events
)
Async Engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Async engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
echo=True,
pool_size=10,
)
# Async session factory
AsyncSessionLocal = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
# Usage
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
users = result.scalars().all()
Connection URL Formats
# SQLite
"sqlite:///database.db" # Relative path
"sqlite:////absolute/path/to/db.db" # Absolute path
"sqlite:///:memory:" # In-memory database
# PostgreSQL
"postgresql://user:password@host:port/database"
"postgresql+asyncpg://user:password@host/database" # Async
# MySQL
"mysql+pymysql://user:password@host:port/database"
"mysql+aiomysql://user:password@host/database" # Async
# Oracle
"oracle+cx_oracle://user:password@host:port/?service_name=myservice"
# SQL Server
"mssql+pyodbc://user:password@host/database?driver=ODBC+Driver+17+for+SQL+Server"
Declarative Models
Basic Model
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.sql import func
from datetime import datetime
class Base(DeclarativeBase):
"""Base class for all models."""
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(255), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
Column Types
from sqlalchemy import (
Column, Integer, BigInteger, SmallInteger,
String, Text, Unicode, UnicodeText,
Float, Double, Numeric, Decimal,
Boolean, Date, Time, DateTime,
JSON, LargeBinary, Enum,
ForeignKey, ForeignKeyConstraint,
Index, UniqueConstraint, CheckConstraint,
)
class Product(Base):
__tablename__ = "products"
# Integer types
id = Column(Integer, primary_key=True)
quantity = Column(Integer, default=0)
big_id = Column(BigInteger)
small_int = Column(SmallInteger)
# String types
name = Column(String(100), nullable=False)
description = Column(Text)
code = Column(Unicode(20)) # Unicode support
# Numeric types
price = Column(Numeric(10, 2)) # Precision, scale
weight = Column(Float)
# Boolean
is_available = Column(Boolean, default=True)
# Date/Time
created_at = Column(DateTime(timezone=True), server_default=func.now())
published_date = Column(Date)
open_time = Column(Time)
# JSON
metadata = Column(JSON)
tags = Column(JSON) # PostgreSQL JSONB
# Binary
image_data = Column(LargeBinary)
# Enum
status = Column(Enum("draft", "published", "archived", name="product_status"))
Table Arguments
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
slug = Column(String(100))
title = Column(String(200))
author_id = Column(Integer, ForeignKey("users.id"))
# Table-level constraints
__table_args__ = (
UniqueConstraint("slug", name="uq_article_slug"),
Index("ix_article_author", "author_id"),
CheckConstraint("length(title) > 0", name="ck_article_title"),
{"schema": "blog"}, # Schema name
)
Relationships
One-to-Many
from sqlalchemy.orm import relationship, Mapped, mapped_column
from typing import List, Optional
class Author(Base):
__tablename__ = "authors"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
# One-to-many relationship
articles: Mapped[List["Article"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
)
class Article(Base):
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
author_id: Mapped[int] = mapped_column(ForeignKey("authors.id"))
# Relationship to parent
author: Mapped["Author"] = relationship(back_populates="articles")
Many-to-Many
# Association table
article_tags = Table(
"article_tags",
Base.metadata,
Column("article_id", Integer, ForeignKey("articles.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
Column("created_at", DateTime, server_default=func.now()),
)
class Article(Base):
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
tags: Mapped[List["Tag"]] = relationship(
secondary=article_tags,
back_populates="articles",
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
articles: Mapped[List["Article"]] = relationship(
secondary=article_tags,
back_populates="tags",
)
Association Object (Many-to-Many with Extra Fields)
class OrderItem(Base):
__tablename__ = "order_items"
order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), primary_key=True)
product_id: Mapped[int] = mapped_column(ForeignKey("products.id"), primary_key=True)
quantity: Mapped[int] = mapped_column(default=1)
unit_price: Mapped[float] = mapped_column(Numeric(10, 2))
order: Mapped["Order"] = relationship(back_populates="items")
product: Mapped["Product"] = relationship(back_populates="order_items")
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
items: Mapped[List["OrderItem"]] = relationship(back_populates="order", cascade="all, delete-orphan")
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
order_items: Mapped[List["OrderItem"]] = relationship(back_populates="product")
One-to-One
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
profile: Mapped["UserProfile"] = relationship(
back_populates="user",
uselist=False, # One-to-one
cascade="all, delete-orphan",
)
class UserProfile(Base):
__tablename__ = "user_profiles"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True)
bio: Mapped[Optional[str]] = mapped_column(Text)
avatar_url: Mapped[Optional[str]] = mapped_column(String(255))
user: Mapped["User"] = relationship(back_populates="profile")
Self-Referential
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
parent_id: Mapped[Optional[int]] = mapped_column(ForeignKey("categories.id"))
# Self-referential relationship
parent: Mapped[Optional["Category"]] = relationship(
back_populates="children",
remote_side=[id],
)
children: Mapped[List["Category"]] = relationship(
back_populates="parent",
cascade="all, delete-orphan",
)
Relationship Options
class Article(Base):
# Lazy loading options
comments: Mapped[List["Comment"]] = relationship(
lazy="select", # Default: load on access
# lazy="joined", # Eager load with JOIN
# lazy="subquery", # Eager load with separate query
# lazy="dynamic", # Return query object (for filtering)
# lazy="noload", # Don't load
# lazy="raise", # Raise error on access
)
# Cascade options
items: Mapped[List["Item"]] = relationship(
cascade="all", # All operations
cascade="all, delete-orphan", # Delete children when parent deleted
cascade="save-update, merge", # Only these operations
)
# Order by
comments: Mapped[List["Comment"]] = relationship(
order_by="Comment.created_at.desc()",
)
Queries (SQLAlchemy 2.0 Style)
Basic Queries
from sqlalchemy import select
from sqlalchemy.orm import Session
# Select all
stmt = select(User)
users = session.execute(stmt).scalars().all()
# Select with where
stmt = select(User).where(User.is_active == True)
active_users = session.execute(stmt).scalars().all()
# Multiple conditions
stmt = select(User).where(
User.is_active == True,
User.created_at > "2024-01-01",
)
users = session.execute(stmt).scalars().all()
# Select specific columns
stmt = select(User.id, User.username, User.email)
result = session.execute(stmt)
for row in result:
print(f"ID: {row.id}, Username: {row.username}")
# Get by primary key
user = session.get(User, 1) # Returns None if not found
# Get one
stmt = select(User).where(User.username == "john")
user = session.execute(stmt).scalar_one_or_none() # None if not found
user = session.execute(stmt).scalar_one() # Raises if not found
Joins
# Simple join
stmt = select(Article).join(Author)
articles = session.execute(stmt).scalars().all()
# Join with condition
stmt = select(Article).join(Author, Article.author_id == Author.id)
# Multiple joins
stmt = select(Comment).join(Article).join(Author)
# Join with specific columns
stmt = select(Article.title, Author.name).join(Author)
result = session.execute(stmt)
for row in result:
print(f"{row.title} by {row.name}")
# Left outer join
from sqlalchemy.orm import outerjoin
stmt = select(Author, Article).outerjoin(Article)
# Join to alias (self-join)
from sqlalchemy.orm import aliased
Manager = aliased(Employee)
stmt = select(Employee, Manager).join(
Manager, Employee.manager_id == Manager.id
)
# Eager loading with joinedload
from sqlalchemy.orm import joinedload
stmt = select(Author).options(joinedload(Author.articles))
authors = session.execute(stmt).unique().scalars().all()
# Selectin load (separate query)
from sqlalchemy.orm import selectinload
stmt = select(Author).options(selectinload(Author.articles))
# Load only specific relationships
stmt = select(Author).options(
selectinload(Author.articles).selectinload(Article.tags)
)
Filtering
from sqlalchemy import and_, or_, not_, func, desc, asc
# Comparison operators
stmt = select(User).where(User.age > 18)
stmt = select(User).where(User.age >= 18)
stmt = select(User).where(User.age < 65)
stmt = select(User).where(User.name == "John")
stmt = select(User).where(User.name != "John")
# LIKE
stmt = select(User).where(User.name.like("%john%"))
stmt = select(User).where(User.name.ilike("%JOHN%")) # Case-insensitive
# IN
stmt = select(User).where(User.id.in_([1, 2, 3]))
stmt = select(User).where(User.status.in_(["active", "pending"]))
# NOT IN
stmt = select(User).where(User.id.not_in([1, 2, 3]))
# BETWEEN
stmt = select(User).where(User.age.between(18, 65))
# IS NULL / IS NOT NULL
stmt = select(User).where(User.deleted_at.is_(None))
stmt = select(User).where(User.deleted_at.is_not(None))
# AND / OR / NOT
stmt = select(User).where(
and_(User.is_active == True, User.age > 18)
)
stmt = select(User).where(
or_(User.role == "admin", User.role == "moderator")
)
stmt = select(User).where(
not_(User.is_banned)
)
# Chained filters
stmt = (
select(User)
.where(User.is_active == True)
.where(User.age >= 18)
.where(User.country == "US")
)
Ordering and Limiting
# Order by
stmt = select(User).order_by(User.created_at)
stmt = select(User).order_by(desc(User.created_at))
stmt = select(User).order_by(User.last_name, User.first_name)
# Limit and offset
stmt = select(User).limit(10)
stmt = select(User).offset(20).limit(10) # Pagination
# Pagination helper
def paginate(query, page: int, per_page: int = 20):
return query.offset((page - 1) * per_page).limit(per_page)
stmt = paginate(select(User), page=2)
Aggregation
from sqlalchemy import func, count, sum, avg, max, min
# Count
stmt = select(count()).select_from(User)
total = session.execute(stmt).scalar()
# Count with filter
stmt = select(count(User.id)).where(User.is_active == True)
active_count = session.execute(stmt).scalar()
# Sum, Avg, Min, Max
stmt = select(sum(Order.total))
stmt = select(avg(Product.price))
stmt = select(max(User.age))
stmt = select(min(Product.price))
# Group by
stmt = (
select(Author.name, count(Article.id))
.join(Article)
.group_by(Author.id)
.order_by(desc(count(Article.id)))
)
# Having
stmt = (
select(Author.name, count(Article.id).label("article_count"))
.join(Article)
.group_by(Author.id)
.having(count(Article.id) > 5)
)
Subqueries
# Scalar subquery
subq = (
select(func.avg(Product.price))
.where(Product.category_id == Category.id)
.scalar_subquery()
)
stmt = select(Category.name, subq.label("avg_price"))
# IN subquery
subq = select(Article.author_id).where(Article.views > 1000)
stmt = select(Author).where(Author.id.in_(subq))
# EXISTS
from sqlalchemy import exists
subq = select(Article.id).where(Article.author_id == Author.id)
stmt = select(Author).where(exists(subq))
# CTE (Common Table Expression)
cte = (
select(Author.name, count(Article.id).label("article_count"))
.join(Article)
.group_by(Author.id)
.cte("author_stats")
)
stmt = select(cte).where(cte.c.article_count > 10)
Sessions
Session Management
from sqlalchemy.orm import Session, sessionmaker
# Create session factory
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)
# Use session with context manager
with SessionLocal() as session:
user = session.execute(select(User)).scalar()
session.commit()
# Manual management
session = SessionLocal()
try:
user = session.execute(select(User)).scalar()
session.commit()
finally:
session.close()
# Dependency injection (FastAPI style)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Adding and Updating
# Add single object
user = User(username="john", email="john@example.com")
session.add(user)
session.commit()
session.refresh(user) # Refresh to get server-generated values
# Add multiple objects
users = [
User(username="user1", email="user1@example.com"),
User(username="user2", email="user2@example.com"),
]
session.add_all(users)
session.commit()
# Update object
user = session.get(User, 1)
user.email = "newemail@example.com"
session.commit()
# Update with query
from sqlalchemy import update
stmt = (
update(User)
.where(User.is_active == True)
.values(last_login=func.now())
)
session.execute(stmt)
session.commit()
# Update with returning
stmt = (
update(User)
.where(User.id == 1)
.values(email="new@example.com")
.returning(User)
)
result = session.execute(stmt)
updated_user = result.scalar_one()
session.commit()
Deleting
# Delete object
user = session.get(User, 1)
session.delete(user)
session.commit()
# Delete with query
from sqlalchemy import delete
stmt = delete(User).where(User.is_active == False)
session.execute(stmt)
session.commit()
# Delete with returning
stmt = (
delete(User)
.where(User.id == 1)
.returning(User.id, User.username)
)
result = session.execute(stmt)
deleted = result.fetchone()
session.commit()
Transactions
# Nested transaction (SAVEPOINT)
with session.begin_nested():
session.add(User(username="test"))
# Auto-rollback on exception
# Manual transaction control
session.begin()
try:
session.add(user)
session.commit()
except:
session.rollback()
raise
# Using begin() context manager
with session.begin():
session.add(user)
# Auto-commit or rollback
Bulk Operations
# Bulk insert (no events, no relationships)
session.execute(
insert(User),
[
{"username": "user1", "email": "user1@example.com"},
{"username": "user2", "email": "user2@example.com"},
]
)
session.commit()
# Bulk update
session.execute(
update(User)
.where(User.is_active == True)
.values(last_login=func.now())
)
# ORM bulk operations (with events)
session.bulk_insert_mappings(User, [
{"username": "user1", "email": "user1@example.com"},
{"username": "user2", "email": "user2@example.com"},
])
session.bulk_update_mappings(User, [
{"id": 1, "email": "new1@example.com"},
{"id": 2, "email": "new2@example.com"},
])
Async SQLAlchemy
Async Models and Session
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
)
from sqlalchemy.orm import Mapped, mapped_column
# Async engine
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
# Async session factory
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
# Async context manager
async with async_session() as session:
async with session.begin():
result = await session.execute(select(User))
users = result.scalars().all()
Async Queries
from sqlalchemy import select
async def get_user(user_id: int) -> Optional[User]:
async with async_session() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_paginated(page: int, per_page: int) -> List[User]:
async with async_session() as session:
stmt = (
select(User)
.order_by(User.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
)
result = await session.execute(stmt)
return result.scalars().all()
async def create_user(username: str, email: str) -> User:
async with async_session() as session:
async with session.begin():
user = User(username=username, email=email)
session.add(user)
await session.flush()
await session.refresh(user)
return user
Async with FastAPI
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
app = FastAPI()
async def get_db():
async with async_session() as session:
try:
yield session
finally:
await session.close()
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users")
async def create_user(
username: str,
email: str,
db: AsyncSession = Depends(get_db)
):
user = User(username=username, email=email)
db.add(user)
await db.commit()
await db.refresh(user)
return user
Alembic Migrations
Setup
# Initialize Alembic
alembic init alembic
# Edit alembic.ini
sqlalchemy.url = postgresql://user:password@localhost/mydb
# Or use env.py for dynamic URL
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from myapp.models import Base # Import your models
config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
Creating Migrations
# Auto-generate migration from model changes
alembic revision --autogenerate -m "Add user table"
# Create empty migration
alembic revision -m "Add custom index"
# Apply migrations
alembic upgrade head
# Rollback one migration
alembic downgrade -1
# Rollback to specific revision
alembic downgrade abc123
# View history
alembic history
# Current revision
alembic current
Migration File
# alembic/versions/abc123_add_user_table.py
"""Add user table
Revision ID: abc123
Revises:
Create Date: 2024-01-15 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(50), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username'),
sa.UniqueConstraint('email'),
)
op.create_index('ix_users_username', 'users', ['username'])
def downgrade():
op.drop_index('ix_users_username', table_name='users')
op.drop_table('users')
Common Migration Operations
def upgrade():
# Create table
op.create_table(
'products',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('name', sa.String(100), nullable=False),
)
# Add column
op.add_column('users', sa.Column('phone', sa.String(20)))
# Drop column
op.drop_column('users', 'phone')
# Alter column
op.alter_column(
'users',
'username',
existing_type=sa.String(50),
type_=sa.String(100),
nullable=True,
)
# Create index
op.create_index('ix_users_email', 'users', ['email'], unique=True)
# Drop index
op.drop_index('ix_users_email', table_name='users')
# Create foreign key
op.create_foreign_key(
'fk_articles_author',
'articles',
'users',
['author_id'],
['id'],
ondelete='CASCADE',
)
# Drop foreign key
op.drop_constraint('fk_articles_author', 'articles', type_='foreignkey')
# Execute raw SQL
op.execute("UPDATE users SET is_active = TRUE")
Best Practices
1. Use Mapped Types (SQLAlchemy 2.0)
# Good: Mapped types with type hints
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50))
bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
articles: Mapped[List["Article"]] = relationship()
2. Session Scope
# Good: Use context managers
with SessionLocal() as session:
user = session.execute(select(User)).scalar()
session.commit()
# Bad: Forget to close
session = SessionLocal()
user = session.execute(select(User)).scalar()
# session.close() missing!
3. Eager Loading
# Good: Explicit eager loading
stmt = select(Author).options(selectinload(Author.articles))
# Bad: N+1 query problem
authors = session.execute(select(Author)).scalars().all()
for author in authors:
print(author.articles) # Triggers query for each author!
4. Use expire_on_commit=False
# Good: Can access attributes after commit
SessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False, # Access attributes after commit
)
with SessionLocal() as session:
user = User(username="john")
session.add(user)
session.commit()
print(user.username) # Works with expire_on_commit=False
5. Connection Pooling
# Good: Configure pool for production
engine = create_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Check connection health
pool_recycle=3600, # Recycle old connections
)
Common Issues
Issue: Detached Instance
# Problem: Accessing relationship on detached object
with SessionLocal() as session:
user = session.get(User, 1)
# session closed, user is detached
print(user.articles) # DetachedInstanceError!
# Solution: Eager load or merge
with SessionLocal() as session:
user = session.execute(
select(User).options(selectinload(User.articles)).where(User.id == 1)
).scalar_one()
# Now user.articles is loaded
Issue: Flush vs Commit
# flush() - Send SQL to database, don't commit transaction
session.add(user)
session.flush() # Get user.id from database
profile = UserProfile(user_id=user.id)
session.add(profile)
session.commit() # Commit both
# commit() - Flush and commit transaction
session.add(user)
session.commit() # All changes persisted
Issue: Session Thread Safety
# Problem: Session is not thread-safe
# Each thread needs its own session
# Solution: Use scoped_session
from sqlalchemy.orm import scoped_session
Session = scoped_session(sessionmaker(bind=engine))
# In each thread:
session = Session()
# Use session...
Session.remove() # Clean up
References
- Official Documentation: https://docs.sqlalchemy.org/
- SQLAlchemy 2.0 Migration: https://docs.sqlalchemy.org/en/20/changelog/migration_20.html
- Alembic Documentation: https://alembic.sqlalchemy.org/
- Async Support: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html