Agent Skills: ETL Core Patterns

Core ETL reliability patterns including idempotency, checkpointing, error handling, chunking, retry logic, and logging.

UncategorizedID: majesticlabs-dev/majestic-marketplace/etl-core-patterns

Install this agent skill to your local

pnpm dlx add-skill https://github.com/majesticlabs-dev/majestic-marketplace/tree/HEAD/plugins/majestic-data/skills/etl-core-patterns

Skill Files

Browse the full folder contents for etl-core-patterns.

Download Skill

Loading file tree…

plugins/majestic-data/skills/etl-core-patterns/SKILL.md

Skill Metadata

Name
etl-core-patterns
Description
Core ETL reliability patterns including idempotency, checkpointing, error handling, chunking, retry logic, and logging.

ETL Core Patterns

Reliability patterns for production data pipelines.

Idempotency Patterns

# Pattern 1: Delete-then-insert (simple, works for small datasets)
def load_daily_data(date: str, df: pd.DataFrame) -> None:
    with engine.begin() as conn:
        conn.execute(
            text("DELETE FROM daily_metrics WHERE date = :date"),
            {"date": date}
        )
        df.to_sql('daily_metrics', conn, if_exists='append', index=False)

# Pattern 2: UPSERT (better for large datasets)
def upsert_records(df: pd.DataFrame) -> None:
    for batch in chunked(df.to_dict('records'), 1000):
        stmt = insert(MyTable).values(batch)
        stmt = stmt.on_conflict_do_update(
            index_elements=['id'],
            set_={col: stmt.excluded[col] for col in update_cols}
        )
        session.execute(stmt)

# Pattern 3: Source hash for change detection
def extract_with_hash(df: pd.DataFrame) -> pd.DataFrame:
    hash_cols = ['id', 'name', 'value', 'updated_at']
    df['_row_hash'] = pd.util.hash_pandas_object(df[hash_cols])
    return df

Checkpointing

import json
from pathlib import Path

class Checkpoint:
    def __init__(self, path: str):
        self.path = Path(path)
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return {}

    def save(self) -> None:
        self.path.write_text(json.dumps(self.state, default=str))

    def get_last_processed(self, key: str) -> str | None:
        return self.state.get(key)

    def set_last_processed(self, key: str, value: str) -> None:
        self.state[key] = value
        self.save()

# Usage
checkpoint = Checkpoint('.etl_checkpoint.json')
last_id = checkpoint.get_last_processed('users_sync')

for batch in fetch_users_since(last_id):
    process(batch)
    checkpoint.set_last_processed('users_sync', batch[-1]['id'])

Error Handling

from dataclasses import dataclass

@dataclass
class FailedRecord:
    source_id: str
    error: str
    raw_data: dict
    timestamp: datetime

class ETLProcessor:
    def __init__(self):
        self.failed_records: list[FailedRecord] = []

    def process_batch(self, records: list[dict]) -> list[dict]:
        processed = []
        for record in records:
            try:
                processed.append(self.transform(record))
            except Exception as e:
                self.failed_records.append(FailedRecord(
                    source_id=record.get('id', 'unknown'),
                    error=str(e),
                    raw_data=record,
                    timestamp=datetime.now()
                ))
        return processed

    def save_failures(self, path: str) -> None:
        if self.failed_records:
            df = pd.DataFrame([vars(r) for r in self.failed_records])
            df.to_parquet(f"{path}/failures_{datetime.now():%Y%m%d_%H%M%S}.parquet")

# Dead letter queue pattern
def process_with_dlq(records: list[dict], dlq_table: str) -> None:
    for record in records:
        try:
            process(record)
        except Exception as e:
            save_to_dlq(dlq_table, record, str(e))

Chunked Processing

from typing import Iterator, TypeVar

T = TypeVar('T')

def chunked(iterable: Iterator[T], size: int) -> Iterator[list[T]]:
    """Yield successive chunks from iterable."""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Memory-efficient file processing
def process_large_csv(path: str, chunk_size: int = 50_000) -> None:
    for i, chunk in enumerate(pd.read_csv(path, chunksize=chunk_size)):
        print(f"Processing chunk {i}: {len(chunk)} rows")
        transformed = transform(chunk)
        load(transformed, mode='append')
        del chunk, transformed  # Explicit memory cleanup
        gc.collect()

Retry Logic

import time
from functools import wraps

def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """Decorator for retrying failed operations."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        print(f"Attempt {attempt + 1} failed: {e}. Retrying in {current_delay}s")
                        time.sleep(current_delay)
                        current_delay *= backoff

            raise last_exception
        return wrapper
    return decorator

@retry(max_attempts=3, delay=1.0, backoff=2.0)
def fetch_from_api(url: str) -> dict:
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

Logging Best Practices

import structlog

logger = structlog.get_logger()

def process_with_logging(batch_id: str, records: list[dict]) -> None:
    log = logger.bind(batch_id=batch_id, record_count=len(records))

    log.info("batch_started")

    try:
        result = process(records)
        log.info("batch_completed",
                 processed=result.processed_count,
                 failed=result.failed_count)
    except Exception as e:
        log.error("batch_failed", error=str(e))
        raise

Data Migration Scripts

Create production-safe migration scripts with rollback and validation.

Migration Template (PostgreSQL)

-- Migration: 20240115_add_customer_tier
-- Description: Add column and backfill from order history

-- PRE-MIGRATION CHECKS
DO $$
BEGIN
    IF (SELECT COUNT(*) FROM customers) = 0 THEN
        RAISE EXCEPTION 'No customers found - aborting migration';
    END IF;
END $$;

CREATE TEMP TABLE migration_baseline AS
SELECT COUNT(*) as total_customers, NOW() as snapshot_time
FROM customers;

-- FORWARD MIGRATION
BEGIN;

ALTER TABLE customers ADD COLUMN IF NOT EXISTS tier VARCHAR(20);

-- Batch backfill
DO $$
DECLARE
    batch_size INT := 10000;
    affected INT;
BEGIN
    LOOP
        WITH batch AS (
            SELECT id FROM customers
            WHERE tier IS NULL
            LIMIT batch_size
            FOR UPDATE SKIP LOCKED
        )
        UPDATE customers c
        SET tier = CASE
            WHEN total_orders >= 100 THEN 'platinum'
            WHEN total_orders >= 50 THEN 'gold'
            WHEN total_orders >= 10 THEN 'silver'
            ELSE 'bronze'
        END
        FROM (
            SELECT customer_id, COUNT(*) as total_orders
            FROM orders GROUP BY customer_id
        ) o
        WHERE c.id = o.customer_id AND c.id IN (SELECT id FROM batch);

        GET DIAGNOSTICS affected = ROW_COUNT;
        EXIT WHEN affected = 0;
        COMMIT;
        BEGIN;
    END LOOP;
END $$;

ALTER TABLE customers ALTER COLUMN tier SET DEFAULT 'bronze';
ALTER TABLE customers ADD CONSTRAINT chk_tier
    CHECK (tier IN ('bronze', 'silver', 'gold', 'platinum'));
COMMIT;

-- POST-MIGRATION VALIDATION
DO $$
DECLARE
    before_count INT;
    after_count INT;
BEGIN
    SELECT total_customers INTO before_count FROM migration_baseline;
    SELECT COUNT(*) INTO after_count FROM customers;
    IF before_count != after_count THEN
        RAISE EXCEPTION 'Row count mismatch: before=%, after=%', before_count, after_count;
    END IF;
END $$;

Rollback Script

BEGIN;
ALTER TABLE customers DROP CONSTRAINT IF EXISTS chk_tier;
ALTER TABLE customers ALTER COLUMN tier DROP DEFAULT;
ALTER TABLE customers DROP COLUMN IF EXISTS tier;
COMMIT;

Python Migration Framework

@dataclass
class Migration:
    id: str
    description: str
    up: Callable
    down: Callable
    validate: Callable

class MigrationRunner:
    def __init__(self, engine):
        self.engine = engine

    def run(self, migration: Migration, dry_run: bool = False) -> bool:
        with self.engine.begin() as conn:
            baseline = self._capture_baseline(conn)
            if dry_run:
                return True
            try:
                migration.up(conn)
                if not migration.validate(conn, baseline):
                    raise ValueError("Validation failed")
                return True
            except Exception as e:
                migration.down(conn)
                raise

Safe Migration Patterns

-- Adding a column: nullable first, backfill, then constraint
ALTER TABLE t ADD COLUMN new_col TYPE;
UPDATE t SET new_col = compute_value();
ALTER TABLE t ALTER COLUMN new_col SET NOT NULL;

-- Renaming a column: add new, copy, drop old
ALTER TABLE t ADD COLUMN new_name TYPE;
UPDATE t SET new_name = old_name;
ALTER TABLE t DROP COLUMN old_name;

-- Changing column type: add new, migrate, swap
ALTER TABLE t ADD COLUMN col_new NEWTYPE;
UPDATE t SET col_new = col::NEWTYPE;
ALTER TABLE t DROP COLUMN col;
ALTER TABLE t RENAME COLUMN col_new TO col;

-- Large table batch updates
DO $$
DECLARE
    batch_size INT := 10000;
    total_updated INT := 0;
BEGIN
    LOOP
        WITH batch AS (
            SELECT id FROM large_table
            WHERE needs_update = true
            LIMIT batch_size
        )
        UPDATE large_table SET ...
        WHERE id IN (SELECT id FROM batch);
        GET DIAGNOSTICS rows_affected = ROW_COUNT;
        total_updated := total_updated + rows_affected;
        EXIT WHEN rows_affected = 0;
        PERFORM pg_sleep(0.1);
    END LOOP;
END $$;

Migration Safety Checklist

Before: tested on staging, rollback tested, backup taken, maintenance window scheduled. After: validation queries passed, application health checked, performance normal.