Data Migration Scripts
Overview
Create robust, safe, and reversible data migration scripts for database schema changes and data transformations with minimal downtime.
When to Use
- Database schema changes
- Adding/removing/modifying columns
- Migrating between database systems
- Data transformations and cleanup
- Splitting or merging tables
- Changing data types
- Adding indexes and constraints
- Backfilling data
- Multi-tenant data migrations
Migration Principles
- Reversible - Every migration should have a rollback
- Idempotent - Safe to run multiple times
- Atomic - All-or-nothing execution
- Tested - Test on production-like data
- Monitored - Track progress and errors
- Documented - Clear purpose and side effects
Implementation Examples
1. Knex.js Migrations (Node.js)
import { Knex } from "knex";
// migrations/20240101000000_add_user_preferences.ts
export async function up(knex: Knex): Promise<void> {
// Create new table
await knex.schema.createTable("user_preferences", (table) => {
table.uuid("id").primary().defaultTo(knex.raw("gen_random_uuid()"));
table
.uuid("user_id")
.notNullable()
.references("id")
.inTable("users")
.onDelete("CASCADE");
table.jsonb("preferences").defaultTo("{}");
table.timestamp("created_at").defaultTo(knex.fn.now());
table.timestamp("updated_at").defaultTo(knex.fn.now());
table.index("user_id");
});
// Migrate existing data
await knex.raw(`
INSERT INTO user_preferences (user_id, preferences)
SELECT id, jsonb_build_object(
'theme', COALESCE(theme, 'light'),
'notifications', COALESCE(notifications_enabled, true)
)
FROM users
WHERE theme IS NOT NULL OR notifications_enabled IS NOT NULL
`);
console.log(
"Migrated user preferences for",
await knex("user_preferences").count(),
);
}
export async function down(knex: Knex): Promise<void> {
// Restore data to original table
await knex.raw(`
UPDATE users u
SET
theme = (p.preferences->>'theme'),
notifications_enabled = (p.preferences->>'notifications')::boolean
FROM user_preferences p
WHERE u.id = p.user_id
`);
// Drop new table
await knex.schema.dropTableIfExists("user_preferences");
}
// migrations/20240102000000_add_email_verification.ts
export async function up(knex: Knex): Promise<void> {
// Add new columns
await knex.schema.table("users", (table) => {
table.boolean("email_verified").defaultTo(false);
table.timestamp("email_verified_at").nullable();
table.string("verification_token").nullable();
});
// Backfill verified status for existing users
await knex("users")
.where("created_at", "<", knex.raw("NOW() - INTERVAL '30 days'"))
.update({
email_verified: true,
email_verified_at: knex.fn.now(),
});
// Add index
await knex.schema.table("users", (table) => {
table.index("verification_token");
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.table("users", (table) => {
table.dropIndex("verification_token");
table.dropColumn("email_verified");
table.dropColumn("email_verified_at");
table.dropColumn("verification_token");
});
}
2. Alembic Migrations (Python/SQLAlchemy)
"""Add user roles and permissions
Revision ID: a1b2c3d4e5f6
Revises: previous_revision
Create Date: 2024-01-01 00:00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers
revision = 'a1b2c3d4e5f6'
down_revision = 'previous_revision'
branch_labels = None
depends_on = None
def upgrade():
# Create roles table
op.create_table(
'roles',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('name', sa.String(50), unique=True, nullable=False),
sa.Column('description', sa.Text()),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
# Create user_roles junction table
op.create_table(
'user_roles',
sa.Column('user_id', sa.Integer(), sa.ForeignKey('users.id', ondelete='CASCADE')),
sa.Column('role_id', sa.Integer(), sa.ForeignKey('roles.id', ondelete='CASCADE')),
sa.Column('assigned_at', sa.DateTime(), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('user_id', 'role_id')
)
# Create indexes
op.create_index('idx_user_roles_user_id', 'user_roles', ['user_id'])
op.create_index('idx_user_roles_role_id', 'user_roles', ['role_id'])
# Insert default roles
op.execute("""
INSERT INTO roles (name, description) VALUES
('admin', 'Administrator with full access'),
('user', 'Standard user'),
('guest', 'Guest with limited access')
""")
# Migrate existing users to default role
op.execute("""
INSERT INTO user_roles (user_id, role_id)
SELECT u.id, r.id
FROM users u
CROSS JOIN roles r
WHERE r.name = 'user'
""")
def downgrade():
# Drop tables in reverse order
op.drop_index('idx_user_roles_role_id', 'user_roles')
op.drop_index('idx_user_roles_user_id', 'user_roles')
op.drop_table('user_roles')
op.drop_table('roles')
3. Large Data Migration with Batching
import { Knex } from "knex";
interface MigrationProgress {
total: number;
processed: number;
errors: number;
startTime: number;
}
class LargeDataMigration {
private batchSize = 1000;
private progress: MigrationProgress = {
total: 0,
processed: 0,
errors: 0,
startTime: Date.now(),
};
async migrate(knex: Knex): Promise<void> {
console.log("Starting large data migration...");
// Get total count
const result = await knex("old_table").count("* as count").first();
this.progress.total = parseInt((result?.count as string) || "0");
console.log(`Total records to migrate: ${this.progress.total}`);
// Process in batches
let offset = 0;
while (offset < this.progress.total) {
await this.processBatch(knex, offset);
offset += this.batchSize;
// Log progress
this.logProgress();
// Small delay to avoid overwhelming the database
await this.delay(100);
}
console.log("Migration complete!");
this.logProgress();
}
private async processBatch(knex: Knex, offset: number): Promise<void> {
const trx = await knex.transaction();
try {
// Fetch batch
const records = await trx("old_table")
.select("*")
.limit(this.batchSize)
.offset(offset);
// Transform and insert
const transformed = records.map((record) => this.transformRecord(record));
if (transformed.length > 0) {
await trx("new_table").insert(transformed).onConflict("id").merge(); // Upsert
}
await trx.commit();
this.progress.processed += records.length;
} catch (error) {
await trx.rollback();
console.error(`Batch failed at offset ${offset}:`, error);
this.progress.errors += this.batchSize;
// Continue or abort based on error severity
throw error;
}
}
private transformRecord(record: any): any {
return {
id: record.id,
user_id: record.userId,
data: JSON.stringify(record.legacyData),
created_at: record.createdAt,
updated_at: new Date(),
};
}
private logProgress(): void {
const percent = (
(this.progress.processed / this.progress.total) *
100
).toFixed(2);
const elapsed = Date.now() - this.progress.startTime;
const rate = this.progress.processed / (elapsed / 1000);
console.log(
`Progress: ${this.progress.processed}/${this.progress.total} (${percent}%) ` +
`Errors: ${this.progress.errors} ` +
`Rate: ${rate.toFixed(2)} records/sec`,
);
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
// Usage in migration
export async function up(knex: Knex): Promise<void> {
const migration = new LargeDataMigration();
await migration.migrate(knex);
}
4. Zero-Downtime Migration Pattern
// Phase 1: Add new column (nullable)
export async function up_phase1(knex: Knex): Promise<void> {
await knex.schema.table("users", (table) => {
table.string("email_new").nullable();
});
console.log("Phase 1: Added new column");
}
// Phase 2: Backfill data
export async function up_phase2(knex: Knex): Promise<void> {
const batchSize = 1000;
let processed = 0;
while (true) {
const result = await knex("users")
.whereNull("email_new")
.whereNotNull("email")
.limit(batchSize)
.update({
email_new: knex.raw("email"),
});
processed += result;
if (result < batchSize) break;
console.log(`Backfilled ${processed} records`);
await new Promise((resolve) => setTimeout(resolve, 100));
}
console.log(`Phase 2: Backfilled ${processed} total records`);
}
// Phase 3: Add constraint
export async function up_phase3(knex: Knex): Promise<void> {
await knex.schema.alterTable("users", (table) => {
table.string("email_new").notNullable().alter();
table.unique("email_new");
});
console.log("Phase 3: Added constraints");
}
// Phase 4: Drop old column
export async function up_phase4(knex: Knex): Promise<void> {
await knex.schema.table("users", (table) => {
table.dropColumn("email");
});
await knex.schema.table("users", (table) => {
table.renameColumn("email_new", "email");
});
console.log("Phase 4: Completed migration");
}
5. Migration Validation
class MigrationValidator {
async validate(knex: Knex, migration: string): Promise<boolean> {
console.log(`Validating migration: ${migration}`);
const checks = [
this.checkDataIntegrity(knex),
this.checkConstraints(knex),
this.checkIndexes(knex),
this.checkRowCounts(knex),
];
const results = await Promise.all(checks);
const passed = results.every((r) => r);
if (passed) {
console.log("✓ All validation checks passed");
} else {
console.error("✗ Validation failed");
}
return passed;
}
private async checkDataIntegrity(knex: Knex): Promise<boolean> {
// Check for orphaned records
const orphaned = await knex("user_roles")
.leftJoin("users", "user_roles.user_id", "users.id")
.whereNull("users.id")
.count("* as count")
.first();
const count = parseInt((orphaned?.count as string) || "0");
if (count > 0) {
console.error(`Found ${count} orphaned user_roles records`);
return false;
}
console.log("✓ Data integrity check passed");
return true;
}
private async checkConstraints(knex: Knex): Promise<boolean> {
// Verify constraints exist
const result = await knex.raw(`
SELECT COUNT(*) as count
FROM information_schema.table_constraints
WHERE table_name = 'users'
AND constraint_type = 'UNIQUE'
AND constraint_name LIKE '%email%'
`);
const hasConstraint = result.rows[0].count > 0;
if (!hasConstraint) {
console.error("Email unique constraint missing");
return false;
}
console.log("✓ Constraints check passed");
return true;
}
private async checkIndexes(knex: Knex): Promise<boolean> {
// Verify indexes exist
const result = await knex.raw(`
SELECT indexname
FROM pg_indexes
WHERE tablename = 'users'
AND indexname LIKE '%email%'
`);
if (result.rows.length === 0) {
console.error("Email index missing");
return false;
}
console.log("✓ Indexes check passed");
return true;
}
private async checkRowCounts(knex: Knex): Promise<boolean> {
const [oldCount, newCount] = await Promise.all([
knex("users").count("* as count").first(),
knex("user_preferences").count("* as count").first(),
]);
const old = parseInt((oldCount?.count as string) || "0");
const new_ = parseInt((newCount?.count as string) || "0");
if (Math.abs(old - new_) > old * 0.01) {
console.error(`Row count mismatch: ${old} vs ${new_}`);
return false;
}
console.log("✓ Row counts check passed");
return true;
}
}
// Usage
export async function up(knex: Knex): Promise<void> {
// Run migration
await performMigration(knex);
// Validate
const validator = new MigrationValidator();
const valid = await validator.validate(knex, "add_user_preferences");
if (!valid) {
throw new Error("Migration validation failed");
}
}
6. Cross-Database Migration
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
import logging
logger = logging.getLogger(__name__)
class CrossDatabaseMigration:
def __init__(self, source_url: str, target_url: str):
self.source_engine = create_engine(source_url)
self.target_engine = create_engine(target_url)
self.source_session = sessionmaker(bind=self.source_engine)()
self.target_session = sessionmaker(bind=self.target_engine)()
def migrate_table(self, table_name: str, batch_size: int = 1000):
"""Migrate table from source to target database."""
logger.info(f"Starting migration of table: {table_name}")
# Get table metadata
metadata = MetaData()
source_table = Table(
table_name,
metadata,
autoload_with=self.source_engine
)
# Get total count
total = self.source_session.execute(
source_table.select().with_only_columns(func.count())
).scalar()
logger.info(f"Total records to migrate: {total}")
# Migrate in batches
offset = 0
while offset < total:
# Fetch batch from source
results = self.source_session.execute(
source_table.select()
.limit(batch_size)
.offset(offset)
).fetchall()
if not results:
break
# Transform and insert to target
rows = [dict(row._mapping) for row in results]
transformed = [self.transform_row(row) for row in rows]
self.target_session.execute(
source_table.insert(),
transformed
)
self.target_session.commit()
offset += batch_size
logger.info(f"Migrated {offset}/{total} records")
logger.info(f"Completed migration of {table_name}")
def transform_row(self, row: dict) -> dict:
"""Transform row data if needed."""
# Apply any transformations
return row
def cleanup(self):
"""Close connections."""
self.source_session.close()
self.target_session.close()
Best Practices
✅ DO
- Always write both
upanddownmigrations - Test migrations on production-like data
- Use transactions for atomic operations
- Process large datasets in batches
- Add indexes after data insertion
- Validate data after migration
- Log progress and errors
- Use feature flags for application code changes
- Back up database before running migrations
- Test rollback procedures
- Document migration side effects
- Version control all migrations
- Use idempotent operations
❌ DON'T
- Run untested migrations on production
- Make breaking changes without backwards compatibility
- Process millions of rows in single transaction
- Skip rollback implementation
- Ignore migration failures
- Modify old migrations
- Delete data without backups
- Run migrations manually in production
Migration Checklist
- [ ] Migration has both up and down
- [ ] Tested on production-like dataset
- [ ] Transactions used appropriately
- [ ] Large datasets processed in batches
- [ ] Indexes added after data insertion
- [ ] Data validation included
- [ ] Progress logging implemented
- [ ] Error handling included
- [ ] Rollback tested
- [ ] Documentation written
- [ ] Backup taken
- [ ] Team reviewed