Agent Skills: SQLAlchemy

Python SQL toolkit and ORM with expressive query API, relationship mapping, async support, and Alembic migrations

UncategorizedID: CodeAtCode/oss-ai-skills/sqlalchemy

Install this agent skill to your local

pnpm dlx add-skill https://github.com/CodeAtCode/oss-ai-skills/tree/HEAD/frameworks/sqlalchemy

Skill Files

Browse the full folder contents for sqlalchemy.

Download Skill

Loading file tree…

frameworks/sqlalchemy/SKILL.md

Skill Metadata

Name
sqlalchemy
Description
"Python SQL toolkit and ORM with expressive query API, relationship mapping, async support, and Alembic migrations"

SQLAlchemy

Complete reference for Python SQL toolkit and ORM.

Overview

SQLAlchemy provides a full suite of well-known enterprise-level persistence patterns, designed for efficient and high-performing database access.

Key Features:

  • Core: SQL expression language for building queries
  • ORM: Object-relational mapping with declarative models
  • Async: Full async/await support (SQLAlchemy 2.0+)
  • Migrations: Alembic integration for schema changes
  • Multiple DBs: PostgreSQL, MySQL, SQLite, Oracle, SQL Server

Installation

pip install sqlalchemy

# With async support
pip install sqlalchemy[asyncio]

# With PostgreSQL async driver
pip install sqlalchemy[asyncio] asyncpg

# With Alembic migrations
pip install alembic

# With specific database drivers
pip install psycopg2-binary  # PostgreSQL
pip install pymysql          # MySQL
pip install aiosqlite        # SQLite async

SQLAlchemy 2.0

This skill covers SQLAlchemy 2.0+ syntax which is the current standard:

# SQLAlchemy 2.0 style (recommended)
from sqlalchemy import select
from sqlalchemy.orm import Session

stmt = select(User).where(User.name == "john")
result = session.execute(stmt)
users = result.scalars().all()

Engine and Connection

Creating Engine

from sqlalchemy import create_engine

# SQLite
engine = create_engine("sqlite:///database.db")

# PostgreSQL
engine = create_engine("postgresql://user:password@localhost:5432/mydb")

# PostgreSQL with psycopg2
engine = create_engine("postgresql+psycopg2://user:password@localhost/mydb")

# MySQL
engine = create_engine("mysql+pymysql://user:password@localhost:3306/mydb")

# Connection pool settings
engine = create_engine(
    "postgresql://user:password@localhost/mydb",
    pool_size=10,           # Number of connections to keep
    max_overflow=20,        # Additional connections allowed
    pool_timeout=30,        # Seconds to wait for connection
    pool_recycle=3600,      # Recycle connections after 1 hour
    echo=True,              # Log SQL statements
    echo_pool=True,         # Log connection pool events
)

Async Engine

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# Async engine
async_engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/mydb",
    echo=True,
    pool_size=10,
)

# Async session factory
AsyncSessionLocal = sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

# Usage
async with AsyncSessionLocal() as session:
    result = await session.execute(select(User))
    users = result.scalars().all()

Connection URL Formats

# SQLite
"sqlite:///database.db"              # Relative path
"sqlite:////absolute/path/to/db.db"  # Absolute path
"sqlite:///:memory:"                 # In-memory database

# PostgreSQL
"postgresql://user:password@host:port/database"
"postgresql+asyncpg://user:password@host/database"  # Async

# MySQL
"mysql+pymysql://user:password@host:port/database"
"mysql+aiomysql://user:password@host/database"  # Async

# Oracle
"oracle+cx_oracle://user:password@host:port/?service_name=myservice"

# SQL Server
"mssql+pyodbc://user:password@host/database?driver=ODBC+Driver+17+for+SQL+Server"

Declarative Models

Basic Model

from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.sql import func
from datetime import datetime

class Base(DeclarativeBase):
    """Base class for all models."""
    pass

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    password_hash = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True)
    is_superuser = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

Column Types

from sqlalchemy import (
    Column, Integer, BigInteger, SmallInteger,
    String, Text, Unicode, UnicodeText,
    Float, Double, Numeric, Decimal,
    Boolean, Date, Time, DateTime,
    JSON, LargeBinary, Enum,
    ForeignKey, ForeignKeyConstraint,
    Index, UniqueConstraint, CheckConstraint,
)

class Product(Base):
    __tablename__ = "products"
    
    # Integer types
    id = Column(Integer, primary_key=True)
    quantity = Column(Integer, default=0)
    big_id = Column(BigInteger)
    small_int = Column(SmallInteger)
    
    # String types
    name = Column(String(100), nullable=False)
    description = Column(Text)
    code = Column(Unicode(20))  # Unicode support
    
    # Numeric types
    price = Column(Numeric(10, 2))  # Precision, scale
    weight = Column(Float)
    
    # Boolean
    is_available = Column(Boolean, default=True)
    
    # Date/Time
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    published_date = Column(Date)
    open_time = Column(Time)
    
    # JSON
    metadata = Column(JSON)
    tags = Column(JSON)  # PostgreSQL JSONB
    
    # Binary
    image_data = Column(LargeBinary)
    
    # Enum
    status = Column(Enum("draft", "published", "archived", name="product_status"))

Table Arguments

class Article(Base):
    __tablename__ = "articles"
    
    id = Column(Integer, primary_key=True)
    slug = Column(String(100))
    title = Column(String(200))
    author_id = Column(Integer, ForeignKey("users.id"))
    
    # Table-level constraints
    __table_args__ = (
        UniqueConstraint("slug", name="uq_article_slug"),
        Index("ix_article_author", "author_id"),
        CheckConstraint("length(title) > 0", name="ck_article_title"),
        {"schema": "blog"},  # Schema name
    )

Relationships

One-to-Many

from sqlalchemy.orm import relationship, Mapped, mapped_column
from typing import List, Optional

class Author(Base):
    __tablename__ = "authors"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    
    # One-to-many relationship
    articles: Mapped[List["Article"]] = relationship(
        back_populates="author",
        cascade="all, delete-orphan",
    )

class Article(Base):
    __tablename__ = "articles"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    author_id: Mapped[int] = mapped_column(ForeignKey("authors.id"))
    
    # Relationship to parent
    author: Mapped["Author"] = relationship(back_populates="articles")

Many-to-Many

# Association table
article_tags = Table(
    "article_tags",
    Base.metadata,
    Column("article_id", Integer, ForeignKey("articles.id", ondelete="CASCADE"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
    Column("created_at", DateTime, server_default=func.now()),
)

class Article(Base):
    __tablename__ = "articles"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    
    tags: Mapped[List["Tag"]] = relationship(
        secondary=article_tags,
        back_populates="articles",
    )

class Tag(Base):
    __tablename__ = "tags"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    
    articles: Mapped[List["Article"]] = relationship(
        secondary=article_tags,
        back_populates="tags",
    )

Association Object (Many-to-Many with Extra Fields)

class OrderItem(Base):
    __tablename__ = "order_items"
    
    order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), primary_key=True)
    product_id: Mapped[int] = mapped_column(ForeignKey("products.id"), primary_key=True)
    quantity: Mapped[int] = mapped_column(default=1)
    unit_price: Mapped[float] = mapped_column(Numeric(10, 2))
    
    order: Mapped["Order"] = relationship(back_populates="items")
    product: Mapped["Product"] = relationship(back_populates="order_items")

class Order(Base):
    __tablename__ = "orders"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    items: Mapped[List["OrderItem"]] = relationship(back_populates="order", cascade="all, delete-orphan")

class Product(Base):
    __tablename__ = "products"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    order_items: Mapped[List["OrderItem"]] = relationship(back_populates="product")

One-to-One

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True)
    
    profile: Mapped["UserProfile"] = relationship(
        back_populates="user",
        uselist=False,  # One-to-one
        cascade="all, delete-orphan",
    )

class UserProfile(Base):
    __tablename__ = "user_profiles"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True)
    bio: Mapped[Optional[str]] = mapped_column(Text)
    avatar_url: Mapped[Optional[str]] = mapped_column(String(255))
    
    user: Mapped["User"] = relationship(back_populates="profile")

Self-Referential

class Category(Base):
    __tablename__ = "categories"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    parent_id: Mapped[Optional[int]] = mapped_column(ForeignKey("categories.id"))
    
    # Self-referential relationship
    parent: Mapped[Optional["Category"]] = relationship(
        back_populates="children",
        remote_side=[id],
    )
    children: Mapped[List["Category"]] = relationship(
        back_populates="parent",
        cascade="all, delete-orphan",
    )

Relationship Options

class Article(Base):
    # Lazy loading options
    comments: Mapped[List["Comment"]] = relationship(
        lazy="select",      # Default: load on access
        # lazy="joined",    # Eager load with JOIN
        # lazy="subquery",  # Eager load with separate query
        # lazy="dynamic",   # Return query object (for filtering)
        # lazy="noload",    # Don't load
        # lazy="raise",     # Raise error on access
    )
    
    # Cascade options
    items: Mapped[List["Item"]] = relationship(
        cascade="all",              # All operations
        cascade="all, delete-orphan",  # Delete children when parent deleted
        cascade="save-update, merge",  # Only these operations
    )
    
    # Order by
    comments: Mapped[List["Comment"]] = relationship(
        order_by="Comment.created_at.desc()",
    )

Queries (SQLAlchemy 2.0 Style)

Basic Queries

from sqlalchemy import select
from sqlalchemy.orm import Session

# Select all
stmt = select(User)
users = session.execute(stmt).scalars().all()

# Select with where
stmt = select(User).where(User.is_active == True)
active_users = session.execute(stmt).scalars().all()

# Multiple conditions
stmt = select(User).where(
    User.is_active == True,
    User.created_at > "2024-01-01",
)
users = session.execute(stmt).scalars().all()

# Select specific columns
stmt = select(User.id, User.username, User.email)
result = session.execute(stmt)
for row in result:
    print(f"ID: {row.id}, Username: {row.username}")

# Get by primary key
user = session.get(User, 1)  # Returns None if not found

# Get one
stmt = select(User).where(User.username == "john")
user = session.execute(stmt).scalar_one_or_none()  # None if not found
user = session.execute(stmt).scalar_one()  # Raises if not found

Joins

# Simple join
stmt = select(Article).join(Author)
articles = session.execute(stmt).scalars().all()

# Join with condition
stmt = select(Article).join(Author, Article.author_id == Author.id)

# Multiple joins
stmt = select(Comment).join(Article).join(Author)

# Join with specific columns
stmt = select(Article.title, Author.name).join(Author)
result = session.execute(stmt)
for row in result:
    print(f"{row.title} by {row.name}")

# Left outer join
from sqlalchemy.orm import outerjoin
stmt = select(Author, Article).outerjoin(Article)

# Join to alias (self-join)
from sqlalchemy.orm import aliased
Manager = aliased(Employee)
stmt = select(Employee, Manager).join(
    Manager, Employee.manager_id == Manager.id
)

# Eager loading with joinedload
from sqlalchemy.orm import joinedload
stmt = select(Author).options(joinedload(Author.articles))
authors = session.execute(stmt).unique().scalars().all()

# Selectin load (separate query)
from sqlalchemy.orm import selectinload
stmt = select(Author).options(selectinload(Author.articles))

# Load only specific relationships
stmt = select(Author).options(
    selectinload(Author.articles).selectinload(Article.tags)
)

Filtering

from sqlalchemy import and_, or_, not_, func, desc, asc

# Comparison operators
stmt = select(User).where(User.age > 18)
stmt = select(User).where(User.age >= 18)
stmt = select(User).where(User.age < 65)
stmt = select(User).where(User.name == "John")
stmt = select(User).where(User.name != "John")

# LIKE
stmt = select(User).where(User.name.like("%john%"))
stmt = select(User).where(User.name.ilike("%JOHN%"))  # Case-insensitive

# IN
stmt = select(User).where(User.id.in_([1, 2, 3]))
stmt = select(User).where(User.status.in_(["active", "pending"]))

# NOT IN
stmt = select(User).where(User.id.not_in([1, 2, 3]))

# BETWEEN
stmt = select(User).where(User.age.between(18, 65))

# IS NULL / IS NOT NULL
stmt = select(User).where(User.deleted_at.is_(None))
stmt = select(User).where(User.deleted_at.is_not(None))

# AND / OR / NOT
stmt = select(User).where(
    and_(User.is_active == True, User.age > 18)
)
stmt = select(User).where(
    or_(User.role == "admin", User.role == "moderator")
)
stmt = select(User).where(
    not_(User.is_banned)
)

# Chained filters
stmt = (
    select(User)
    .where(User.is_active == True)
    .where(User.age >= 18)
    .where(User.country == "US")
)

Ordering and Limiting

# Order by
stmt = select(User).order_by(User.created_at)
stmt = select(User).order_by(desc(User.created_at))
stmt = select(User).order_by(User.last_name, User.first_name)

# Limit and offset
stmt = select(User).limit(10)
stmt = select(User).offset(20).limit(10)  # Pagination

# Pagination helper
def paginate(query, page: int, per_page: int = 20):
    return query.offset((page - 1) * per_page).limit(per_page)

stmt = paginate(select(User), page=2)

Aggregation

from sqlalchemy import func, count, sum, avg, max, min

# Count
stmt = select(count()).select_from(User)
total = session.execute(stmt).scalar()

# Count with filter
stmt = select(count(User.id)).where(User.is_active == True)
active_count = session.execute(stmt).scalar()

# Sum, Avg, Min, Max
stmt = select(sum(Order.total))
stmt = select(avg(Product.price))
stmt = select(max(User.age))
stmt = select(min(Product.price))

# Group by
stmt = (
    select(Author.name, count(Article.id))
    .join(Article)
    .group_by(Author.id)
    .order_by(desc(count(Article.id)))
)

# Having
stmt = (
    select(Author.name, count(Article.id).label("article_count"))
    .join(Article)
    .group_by(Author.id)
    .having(count(Article.id) > 5)
)

Subqueries

# Scalar subquery
subq = (
    select(func.avg(Product.price))
    .where(Product.category_id == Category.id)
    .scalar_subquery()
)
stmt = select(Category.name, subq.label("avg_price"))

# IN subquery
subq = select(Article.author_id).where(Article.views > 1000)
stmt = select(Author).where(Author.id.in_(subq))

# EXISTS
from sqlalchemy import exists
subq = select(Article.id).where(Article.author_id == Author.id)
stmt = select(Author).where(exists(subq))

# CTE (Common Table Expression)
cte = (
    select(Author.name, count(Article.id).label("article_count"))
    .join(Article)
    .group_by(Author.id)
    .cte("author_stats")
)
stmt = select(cte).where(cte.c.article_count > 10)

Sessions

Session Management

from sqlalchemy.orm import Session, sessionmaker

# Create session factory
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)

# Use session with context manager
with SessionLocal() as session:
    user = session.execute(select(User)).scalar()
    session.commit()

# Manual management
session = SessionLocal()
try:
    user = session.execute(select(User)).scalar()
    session.commit()
finally:
    session.close()

# Dependency injection (FastAPI style)
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Adding and Updating

# Add single object
user = User(username="john", email="john@example.com")
session.add(user)
session.commit()
session.refresh(user)  # Refresh to get server-generated values

# Add multiple objects
users = [
    User(username="user1", email="user1@example.com"),
    User(username="user2", email="user2@example.com"),
]
session.add_all(users)
session.commit()

# Update object
user = session.get(User, 1)
user.email = "newemail@example.com"
session.commit()

# Update with query
from sqlalchemy import update
stmt = (
    update(User)
    .where(User.is_active == True)
    .values(last_login=func.now())
)
session.execute(stmt)
session.commit()

# Update with returning
stmt = (
    update(User)
    .where(User.id == 1)
    .values(email="new@example.com")
    .returning(User)
)
result = session.execute(stmt)
updated_user = result.scalar_one()
session.commit()

Deleting

# Delete object
user = session.get(User, 1)
session.delete(user)
session.commit()

# Delete with query
from sqlalchemy import delete
stmt = delete(User).where(User.is_active == False)
session.execute(stmt)
session.commit()

# Delete with returning
stmt = (
    delete(User)
    .where(User.id == 1)
    .returning(User.id, User.username)
)
result = session.execute(stmt)
deleted = result.fetchone()
session.commit()

Transactions

# Nested transaction (SAVEPOINT)
with session.begin_nested():
    session.add(User(username="test"))
    # Auto-rollback on exception

# Manual transaction control
session.begin()
try:
    session.add(user)
    session.commit()
except:
    session.rollback()
    raise

# Using begin() context manager
with session.begin():
    session.add(user)
    # Auto-commit or rollback

Bulk Operations

# Bulk insert (no events, no relationships)
session.execute(
    insert(User),
    [
        {"username": "user1", "email": "user1@example.com"},
        {"username": "user2", "email": "user2@example.com"},
    ]
)
session.commit()

# Bulk update
session.execute(
    update(User)
    .where(User.is_active == True)
    .values(last_login=func.now())
)

# ORM bulk operations (with events)
session.bulk_insert_mappings(User, [
    {"username": "user1", "email": "user1@example.com"},
    {"username": "user2", "email": "user2@example.com"},
])

session.bulk_update_mappings(User, [
    {"id": 1, "email": "new1@example.com"},
    {"id": 2, "email": "new2@example.com"},
])

Async SQLAlchemy

Async Models and Session

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker,
)
from sqlalchemy.orm import Mapped, mapped_column

# Async engine
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

# Async session factory
async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

# Async context manager
async with async_session() as session:
    async with session.begin():
        result = await session.execute(select(User))
        users = result.scalars().all()

Async Queries

from sqlalchemy import select

async def get_user(user_id: int) -> Optional[User]:
    async with async_session() as session:
        result = await session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

async def get_users_paginated(page: int, per_page: int) -> List[User]:
    async with async_session() as session:
        stmt = (
            select(User)
            .order_by(User.created_at.desc())
            .offset((page - 1) * per_page)
            .limit(per_page)
        )
        result = await session.execute(stmt)
        return result.scalars().all()

async def create_user(username: str, email: str) -> User:
    async with async_session() as session:
        async with session.begin():
            user = User(username=username, email=email)
            session.add(user)
            await session.flush()
            await session.refresh(user)
            return user

Async with FastAPI

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

app = FastAPI()

async def get_db():
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users")
async def create_user(
    username: str,
    email: str,
    db: AsyncSession = Depends(get_db)
):
    user = User(username=username, email=email)
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

Alembic Migrations

Setup

# Initialize Alembic
alembic init alembic

# Edit alembic.ini
sqlalchemy.url = postgresql://user:password@localhost/mydb

# Or use env.py for dynamic URL
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from myapp.models import Base  # Import your models

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

def run_migrations_offline():
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
        )
        with context.begin_transaction():
            context.run_migrations()

Creating Migrations

# Auto-generate migration from model changes
alembic revision --autogenerate -m "Add user table"

# Create empty migration
alembic revision -m "Add custom index"

# Apply migrations
alembic upgrade head

# Rollback one migration
alembic downgrade -1

# Rollback to specific revision
alembic downgrade abc123

# View history
alembic history

# Current revision
alembic current

Migration File

# alembic/versions/abc123_add_user_table.py
"""Add user table

Revision ID: abc123
Revises: 
Create Date: 2024-01-15 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None

def upgrade():
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('username', sa.String(50), nullable=False),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('username'),
        sa.UniqueConstraint('email'),
    )
    op.create_index('ix_users_username', 'users', ['username'])

def downgrade():
    op.drop_index('ix_users_username', table_name='users')
    op.drop_table('users')

Common Migration Operations

def upgrade():
    # Create table
    op.create_table(
        'products',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('name', sa.String(100), nullable=False),
    )
    
    # Add column
    op.add_column('users', sa.Column('phone', sa.String(20)))
    
    # Drop column
    op.drop_column('users', 'phone')
    
    # Alter column
    op.alter_column(
        'users',
        'username',
        existing_type=sa.String(50),
        type_=sa.String(100),
        nullable=True,
    )
    
    # Create index
    op.create_index('ix_users_email', 'users', ['email'], unique=True)
    
    # Drop index
    op.drop_index('ix_users_email', table_name='users')
    
    # Create foreign key
    op.create_foreign_key(
        'fk_articles_author',
        'articles',
        'users',
        ['author_id'],
        ['id'],
        ondelete='CASCADE',
    )
    
    # Drop foreign key
    op.drop_constraint('fk_articles_author', 'articles', type_='foreignkey')
    
    # Execute raw SQL
    op.execute("UPDATE users SET is_active = TRUE")

Best Practices

1. Use Mapped Types (SQLAlchemy 2.0)

# Good: Mapped types with type hints
class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50))
    bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    articles: Mapped[List["Article"]] = relationship()

2. Session Scope

# Good: Use context managers
with SessionLocal() as session:
    user = session.execute(select(User)).scalar()
    session.commit()

# Bad: Forget to close
session = SessionLocal()
user = session.execute(select(User)).scalar()
# session.close() missing!

3. Eager Loading

# Good: Explicit eager loading
stmt = select(Author).options(selectinload(Author.articles))

# Bad: N+1 query problem
authors = session.execute(select(Author)).scalars().all()
for author in authors:
    print(author.articles)  # Triggers query for each author!

4. Use expire_on_commit=False

# Good: Can access attributes after commit
SessionLocal = sessionmaker(
    bind=engine,
    expire_on_commit=False,  # Access attributes after commit
)

with SessionLocal() as session:
    user = User(username="john")
    session.add(user)
    session.commit()
    print(user.username)  # Works with expire_on_commit=False

5. Connection Pooling

# Good: Configure pool for production
engine = create_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,  # Check connection health
    pool_recycle=3600,   # Recycle old connections
)

Common Issues

Issue: Detached Instance

# Problem: Accessing relationship on detached object
with SessionLocal() as session:
    user = session.get(User, 1)
# session closed, user is detached
print(user.articles)  # DetachedInstanceError!

# Solution: Eager load or merge
with SessionLocal() as session:
    user = session.execute(
        select(User).options(selectinload(User.articles)).where(User.id == 1)
    ).scalar_one()
    # Now user.articles is loaded

Issue: Flush vs Commit

# flush() - Send SQL to database, don't commit transaction
session.add(user)
session.flush()  # Get user.id from database
profile = UserProfile(user_id=user.id)
session.add(profile)
session.commit()  # Commit both

# commit() - Flush and commit transaction
session.add(user)
session.commit()  # All changes persisted

Issue: Session Thread Safety

# Problem: Session is not thread-safe
# Each thread needs its own session

# Solution: Use scoped_session
from sqlalchemy.orm import scoped_session

Session = scoped_session(sessionmaker(bind=engine))

# In each thread:
session = Session()
# Use session...
Session.remove()  # Clean up

Advanced Query Patterns

Aggregation

from sqlalchemy import func, select

# Count
stmt = select(func.count(User.id))
total = session.scalar(stmt)

# Sum / Average
stmt = select(func.sum(Order.amount), func.avg(Order.amount))
total, avg = session.execute(stmt).one()

# Group by
stmt = (
    select(User.name, func.count(Post.id).label("post_count"))
    .join(Post)
    .group_by(User.name)
    .order_by(func.count(Post.id).desc())
)
for name, count in session.execute(stmt):
    print(f"{name}: {count} posts")

Subqueries

from sqlalchemy import subquery

# Subquery: users with more than 5 posts
post_count_sq = (
    select(Post.author_id, func.count(Post.id).label("cnt"))
    .group_by(Post.author_id)
    .subquery()
)

stmt = (
    select(User)
    .join(post_count_sq, User.id == post_count_sq.c.author_id)
    .where(post_count_sq.c.cnt > 5)
)
active_users = session.scalars(stmt).all()

CTEs (Common Table Expressions)

from sqlalchemy import CTE

# Recursive CTE: organizational hierarchy
org_cte = select(Employee).where(Employee.manager_id.is_(None)).cte(name="org", recursive=True)
mgr = org_cte.alias("mgr")
stmt = (
    select(mgr)
    .join(org_cte, mgr.c.manager_id == org_cte.c.id)
)
# Non-recursive CTE
active_cte = (
    select(User.id, User.name)
    .where(User.is_active.is_(True))
    .cte("active_users")
)
stmt = select(Order).join(active_cte, Order.user_id == active_cte.c.id)

Window Functions

from sqlalchemy import over

# Row number, rank, dense rank
stmt = (
    select(
        User.name,
        User.salary,
        func.row_number().over(order_by=User.salary.desc()).label("rn"),
        func.rank().over(order_by=User.salary.desc()).label("rank"),
        func.dense_rank().over(order_by=User.salary.desc()).label("drank"),
        func.sum(User.salary).over(partition_by=User.dept).label("dept_total"),
    )
    .order_by(User.salary.desc())
)
for row in session.execute(stmt):
    print(f"{row.name}: ${row.salary} (rank: {row.rank})")

Transaction Management

Nested Transactions with Savepoints

from sqlalchemy import begin_nested

# Using savepoints for partial rollback
with Session(engine) as session:
    session.begin()
    session.add(User(name="Alice"))

    # Create a savepoint
    nested = session.begin_nested()
    try:
        session.add(Order(user_id=1, amount=100))
        # If this fails, only the nested transaction rolls back
        nested.commit()
    except Exception:
        nested.rollback()
        # Alice is still in the session, only Order was rolled back

    session.commit()

Read-Only Transactions

from sqlalchemy import text

with engine.connect() as conn:
    conn.execute(text("SET TRANSACTION READ ONLY"))
    result = conn.execute(select(User))
    # Any write attempt will raise an error

Session Lifecycle Best Practices

# Pattern 1: Context manager (recommended)
with Session(engine) as session:
    session.add(user)
    session.commit()
# Session automatically closed

# Pattern 2: Async session
from sqlalchemy.ext.asyncio import AsyncSession

async with AsyncSession(async_engine) as session:
    async with session.begin():
        session.add(user)
    # Auto-committed and closed

Bulk Operations

Bulk Inserts

from sqlalchemy import insert

# Core bulk insert (fastest)
stmt = insert(User).values([
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
    {"name": "Charlie", "email": "charlie@example.com"},
])
session.execute(stmt)
session.commit()

# ORM bulk insert (slower, but triggers events)
session.add_all([
    User(name="Alice", email="alice@example.com"),
    User(name="Bob", email="bob@example.com"),
])
session.commit()

Bulk Updates

from sqlalchemy import update

# Core bulk update
stmt = (
    update(User)
    .where(User.is_active.is_(True))
    .values(last_login=func.now())
)
session.execute(stmt)
session.commit()

# Bulk update with binding
stmt = update(User).where(User.name == "old_name").values(name="new_name")
session.execute(stmt)
session.commit()

Bulk Deletes

from sqlalchemy import delete

stmt = delete(User).where(User.last_login < func.now() - text("interval '90 days'"))
result = session.execute(stmt)
session.commit()
print(f"Deleted {result.rowcount} inactive users")

Performance Tips for Large Datasets

# Use yield_per for large result sets
for user in session.scalars(select(User)).yield_per(100):
    process(user)

# Use server-side cursors with stream()
for row in session.stream(select(LargeTable)):
    process(row)

# Batch inserts with executemany
session.execute(insert(User), [
    {"name": f"User {i}", "email": f"user{i}@example.com"}
    for i in range(10000)
], execution_options={"max_rows": 1000})
session.commit()

Alembic Migrations Workflow

Initial Setup

# Install alembic
pip install alembic

# Initialize in project
alembic init alembic

# Edit alembic/env.py to import your Base metadata
# from myapp.models import Base
# target_metadata = Base.metadata

autogenerate Configuration

# alembic/env.py
from sqlalchemy import engine_from_config
from myapp.models import Base

target_metadata = Base.metadata

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
    )
    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            compare_type=True,       # Detect column type changes
            compare_server_default=True,  # Detect default value changes
        )
        with context.begin_transaction():
            context.run_migrations()

Common Migration Commands

# Create a new migration (auto-detect changes)
alembic revision --autogenerate -m "add user table"

# Apply all pending migrations
alembic upgrade head

# Rollback last migration
alembic downgrade -1

# Rollback to specific revision
alembic downgrade abc123

# Show current revision
alembic current

# Show migration history
alembic history

# Generate SQL without applying
alembic upgrade head --sql

Custom Migration Operations

"""add user email column

Revision ID: xyz789
"""
from alembic import op
import sqlalchemy as sa

def upgrade():
    # Add column
    op.add_column("users", sa.Column("email", sa.String(255), nullable=True))

    # Populate with data
    op.execute("UPDATE users SET email = name || '@example.com' WHERE email IS NULL")

    # Make non-nullable
    op.alter_column("users", "email", nullable=False)

    # Add unique constraint
    op.create_unique_constraint("uq_users_email", "users", ["email"])

def downgrade():
    op.drop_constraint("uq_users_email", "users", type_="unique")
    op.drop_column("users", "email")

Common Issues & Debugging

DetachedInstanceError

# Problem: Accessing attributes after session closed
with Session(engine) as session:
    user = session.scalar(select(User).limit(1))
# print(user.name)  # DetachedInstanceError!

# Fix 1: Expire objects on commit = False
session = Session(engine, expire_on_commit=False)

# Fix 2: Access within session context
with Session(engine) as session:
    user = session.scalar(select(User).limit(1))
    name = user.name  # OK, still attached

Lazy Loading Outside Sessions

# Problem: Accessing relationships after session closed
with Session(engine) as session:
    user = session.scalar(select(User).limit(1))
# print(user.posts)  # Error! Relationship not loaded

# Fix: Use eager loading
stmt = select(User).options(selectinload(User.posts))
user = session.scalar(stmt)
print(user.posts)  # OK, already loaded

N+1 Query Problem

# BAD: N+1 queries (1 for users + N for each user's posts)
users = session.scalars(select(User)).all()
for user in users:
    print(len(user.posts))  # Triggers 1 query per user!

# GOOD: Single query with joined eager loading
from sqlalchemy.orm import joinedload
stmt = select(User).options(joinedload(User.posts))
users = session.scalars(stmt).unique().all()

# GOOD: Two queries with selective loading
from sqlalchemy.orm import selectinload
stmt = select(User).options(selectinload(User.posts))
users = session.scalars(stmt).all()

Session Leak Detection

# Enable session tracking for debugging
from sqlalchemy import event

@event.listens_for(Session, "after_commit")
def log_commit(session, context):
    logger.info(f"Session committed: {id(session)}")

@event.listens_for(Session, "after_rollback")
def log_rollback(session, context):
    logger.warning(f"Session rolled back: {id(session)}")

# Use weak_instance_map to track detached instances
# Always use context managers to prevent leaks:
with Session(engine) as session:
    # work...
    session.commit()
# Guaranteed cleanup

References

  • Official Documentation: https://docs.sqlalchemy.org/
  • SQLAlchemy 2.0 Overview: https://docs.sqlalchemy.org/en/20/changelog/migration_20.html
  • SQLAlchemy 2.0 Tutorial: https://docs.sqlalchemy.org/en/20/tutorial/
  • Alembic Documentation: https://alembic.sqlalchemy.org/
  • Alembic Tutorial: https://alembic.sqlalchemy.org/en/latest/tutorial.html
  • Async Support: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
  • ORM Querying Guide: https://docs.sqlalchemy.org/en/20/orm/queryguide/
  • Core Expression Language: https://docs.sqlalchemy.org/en/20/core/expression_api.html