ETL Core Patterns
Reliability patterns for production data pipelines.
Idempotency Patterns
# Pattern 1: Delete-then-insert (simple, works for small datasets)
def load_daily_data(date: str, df: pd.DataFrame) -> None:
with engine.begin() as conn:
conn.execute(
text("DELETE FROM daily_metrics WHERE date = :date"),
{"date": date}
)
df.to_sql('daily_metrics', conn, if_exists='append', index=False)
# Pattern 2: UPSERT (better for large datasets)
def upsert_records(df: pd.DataFrame) -> None:
for batch in chunked(df.to_dict('records'), 1000):
stmt = insert(MyTable).values(batch)
stmt = stmt.on_conflict_do_update(
index_elements=['id'],
set_={col: stmt.excluded[col] for col in update_cols}
)
session.execute(stmt)
# Pattern 3: Source hash for change detection
def extract_with_hash(df: pd.DataFrame) -> pd.DataFrame:
hash_cols = ['id', 'name', 'value', 'updated_at']
df['_row_hash'] = pd.util.hash_pandas_object(df[hash_cols])
return df
Checkpointing
import json
from pathlib import Path
class Checkpoint:
def __init__(self, path: str):
self.path = Path(path)
self.state = self._load()
def _load(self) -> dict:
if self.path.exists():
return json.loads(self.path.read_text())
return {}
def save(self) -> None:
self.path.write_text(json.dumps(self.state, default=str))
def get_last_processed(self, key: str) -> str | None:
return self.state.get(key)
def set_last_processed(self, key: str, value: str) -> None:
self.state[key] = value
self.save()
# Usage
checkpoint = Checkpoint('.etl_checkpoint.json')
last_id = checkpoint.get_last_processed('users_sync')
for batch in fetch_users_since(last_id):
process(batch)
checkpoint.set_last_processed('users_sync', batch[-1]['id'])
Error Handling
from dataclasses import dataclass
@dataclass
class FailedRecord:
source_id: str
error: str
raw_data: dict
timestamp: datetime
class ETLProcessor:
def __init__(self):
self.failed_records: list[FailedRecord] = []
def process_batch(self, records: list[dict]) -> list[dict]:
processed = []
for record in records:
try:
processed.append(self.transform(record))
except Exception as e:
self.failed_records.append(FailedRecord(
source_id=record.get('id', 'unknown'),
error=str(e),
raw_data=record,
timestamp=datetime.now()
))
return processed
def save_failures(self, path: str) -> None:
if self.failed_records:
df = pd.DataFrame([vars(r) for r in self.failed_records])
df.to_parquet(f"{path}/failures_{datetime.now():%Y%m%d_%H%M%S}.parquet")
# Dead letter queue pattern
def process_with_dlq(records: list[dict], dlq_table: str) -> None:
for record in records:
try:
process(record)
except Exception as e:
save_to_dlq(dlq_table, record, str(e))
Chunked Processing
from typing import Iterator, TypeVar
T = TypeVar('T')
def chunked(iterable: Iterator[T], size: int) -> Iterator[list[T]]:
"""Yield successive chunks from iterable."""
batch = []
for item in iterable:
batch.append(item)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
# Memory-efficient file processing
def process_large_csv(path: str, chunk_size: int = 50_000) -> None:
for i, chunk in enumerate(pd.read_csv(path, chunksize=chunk_size)):
print(f"Processing chunk {i}: {len(chunk)} rows")
transformed = transform(chunk)
load(transformed, mode='append')
del chunk, transformed # Explicit memory cleanup
gc.collect()
Retry Logic
import time
from functools import wraps
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""Decorator for retrying failed operations."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {current_delay}s")
time.sleep(current_delay)
current_delay *= backoff
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3, delay=1.0, backoff=2.0)
def fetch_from_api(url: str) -> dict:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
Logging Best Practices
import structlog
logger = structlog.get_logger()
def process_with_logging(batch_id: str, records: list[dict]) -> None:
log = logger.bind(batch_id=batch_id, record_count=len(records))
log.info("batch_started")
try:
result = process(records)
log.info("batch_completed",
processed=result.processed_count,
failed=result.failed_count)
except Exception as e:
log.error("batch_failed", error=str(e))
raise
Data Migration Scripts
Create production-safe migration scripts with rollback and validation.
Migration Template (PostgreSQL)
-- Migration: 20240115_add_customer_tier
-- Description: Add column and backfill from order history
-- PRE-MIGRATION CHECKS
DO $$
BEGIN
IF (SELECT COUNT(*) FROM customers) = 0 THEN
RAISE EXCEPTION 'No customers found - aborting migration';
END IF;
END $$;
CREATE TEMP TABLE migration_baseline AS
SELECT COUNT(*) as total_customers, NOW() as snapshot_time
FROM customers;
-- FORWARD MIGRATION
BEGIN;
ALTER TABLE customers ADD COLUMN IF NOT EXISTS tier VARCHAR(20);
-- Batch backfill
DO $$
DECLARE
batch_size INT := 10000;
affected INT;
BEGIN
LOOP
WITH batch AS (
SELECT id FROM customers
WHERE tier IS NULL
LIMIT batch_size
FOR UPDATE SKIP LOCKED
)
UPDATE customers c
SET tier = CASE
WHEN total_orders >= 100 THEN 'platinum'
WHEN total_orders >= 50 THEN 'gold'
WHEN total_orders >= 10 THEN 'silver'
ELSE 'bronze'
END
FROM (
SELECT customer_id, COUNT(*) as total_orders
FROM orders GROUP BY customer_id
) o
WHERE c.id = o.customer_id AND c.id IN (SELECT id FROM batch);
GET DIAGNOSTICS affected = ROW_COUNT;
EXIT WHEN affected = 0;
COMMIT;
BEGIN;
END LOOP;
END $$;
ALTER TABLE customers ALTER COLUMN tier SET DEFAULT 'bronze';
ALTER TABLE customers ADD CONSTRAINT chk_tier
CHECK (tier IN ('bronze', 'silver', 'gold', 'platinum'));
COMMIT;
-- POST-MIGRATION VALIDATION
DO $$
DECLARE
before_count INT;
after_count INT;
BEGIN
SELECT total_customers INTO before_count FROM migration_baseline;
SELECT COUNT(*) INTO after_count FROM customers;
IF before_count != after_count THEN
RAISE EXCEPTION 'Row count mismatch: before=%, after=%', before_count, after_count;
END IF;
END $$;
Rollback Script
BEGIN;
ALTER TABLE customers DROP CONSTRAINT IF EXISTS chk_tier;
ALTER TABLE customers ALTER COLUMN tier DROP DEFAULT;
ALTER TABLE customers DROP COLUMN IF EXISTS tier;
COMMIT;
Python Migration Framework
@dataclass
class Migration:
id: str
description: str
up: Callable
down: Callable
validate: Callable
class MigrationRunner:
def __init__(self, engine):
self.engine = engine
def run(self, migration: Migration, dry_run: bool = False) -> bool:
with self.engine.begin() as conn:
baseline = self._capture_baseline(conn)
if dry_run:
return True
try:
migration.up(conn)
if not migration.validate(conn, baseline):
raise ValueError("Validation failed")
return True
except Exception as e:
migration.down(conn)
raise
Safe Migration Patterns
-- Adding a column: nullable first, backfill, then constraint
ALTER TABLE t ADD COLUMN new_col TYPE;
UPDATE t SET new_col = compute_value();
ALTER TABLE t ALTER COLUMN new_col SET NOT NULL;
-- Renaming a column: add new, copy, drop old
ALTER TABLE t ADD COLUMN new_name TYPE;
UPDATE t SET new_name = old_name;
ALTER TABLE t DROP COLUMN old_name;
-- Changing column type: add new, migrate, swap
ALTER TABLE t ADD COLUMN col_new NEWTYPE;
UPDATE t SET col_new = col::NEWTYPE;
ALTER TABLE t DROP COLUMN col;
ALTER TABLE t RENAME COLUMN col_new TO col;
-- Large table batch updates
DO $$
DECLARE
batch_size INT := 10000;
total_updated INT := 0;
BEGIN
LOOP
WITH batch AS (
SELECT id FROM large_table
WHERE needs_update = true
LIMIT batch_size
)
UPDATE large_table SET ...
WHERE id IN (SELECT id FROM batch);
GET DIAGNOSTICS rows_affected = ROW_COUNT;
total_updated := total_updated + rows_affected;
EXIT WHEN rows_affected = 0;
PERFORM pg_sleep(0.1);
END LOOP;
END $$;
Migration Safety Checklist
Before: tested on staging, rollback tested, backup taken, maintenance window scheduled. After: validation queries passed, application health checked, performance normal.