Agent Skills: Senior Data Engineer

Expert data engineering covering data pipelines, ETL/ELT, data warehousing, streaming, and data quality.

UncategorizedID: borghei/claude-skills/senior-data-engineer

Install this agent skill to your local

pnpm dlx add-skill https://github.com/borghei/Claude-Skills/tree/HEAD/engineering/senior-data-engineer

Skill Files

Browse the full folder contents for senior-data-engineer.

Download Skill

Loading file tree…

engineering/senior-data-engineer/SKILL.md

Skill Metadata

Name
senior-data-engineer
Description
>

Senior Data Engineer

The agent generates pipeline configurations (Airflow, Prefect, Dagster), validates data quality with profiling and anomaly detection, and optimizes SQL/Spark performance with actionable recommendations.


Quick Start

# Generate an Airflow DAG for incremental PostgreSQL -> Snowflake
python scripts/pipeline_orchestrator.py generate \
  --type airflow --source postgres --destination snowflake \
  --tables orders,customers --mode incremental --schedule "0 5 * * *"

# Validate data quality against a schema
python scripts/data_quality_validator.py validate data.csv \
  --schema schema.json --detect-anomalies --json

# Profile a dataset
python scripts/data_quality_validator.py profile data.csv --json

# Optimize a slow SQL query
python scripts/etl_performance_optimizer.py analyze-sql query.sql \
  --warehouse snowflake --json

# Estimate query cost
python scripts/etl_performance_optimizer.py estimate-cost query.sql \
  --warehouse bigquery --stats data_stats.json --json

Tools Overview

| Tool | Subcommands | Purpose | |------|-------------|---------| | pipeline_orchestrator.py | generate, validate, template | Generate Airflow/Prefect/Dagster pipeline code, validate DAGs | | data_quality_validator.py | validate, profile, generate-suite, contract, schema | Schema validation, profiling, anomaly detection, Great Expectations | | etl_performance_optimizer.py | analyze-sql, analyze-spark, optimize-partition, estimate-cost, template | SQL/Spark optimization, partition strategy, cost estimation |

All subcommands support --json for machine-readable output and --output for file writing.


Workflow 1: Batch ETL Pipeline (PostgreSQL -> dbt -> Snowflake)

Step 1 -- Generate extraction config.

python scripts/pipeline_orchestrator.py generate \
  --type airflow --source postgres --tables orders,customers,products \
  --mode incremental --watermark updated_at --output dags/extract_source.py

Step 2 -- Create dbt staging model.

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
)
SELECT order_id, customer_id, order_date, total_amount, status, _extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)

Step 3 -- Create incremental mart model.

-- models/marts/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id', cluster_by=['order_date']) }}

SELECT o.order_id, o.customer_id, c.customer_segment, o.order_date, o.total_amount, o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}

Step 4 -- Wire into Airflow DAG.

with DAG('daily_etl', schedule_interval='0 5 * * *', catchup=False, tags=['etl']) as dag:
    extract = BashOperator(task_id='extract', bash_command='python scripts/extract.py --date {{ ds }}')
    transform = BashOperator(task_id='dbt_run', bash_command='dbt run --select marts.*')
    test = BashOperator(task_id='dbt_test', bash_command='dbt test --select marts.*')
    extract >> transform >> test

Step 5 -- Validate.

python scripts/data_quality_validator.py validate --table fct_orders --checks all --output report.json

Validation checkpoint: DAG runs end-to-end. Data quality report shows 0 failures on uniqueness, completeness, and freshness.


Workflow 2: Real-Time Streaming (Kafka -> Spark -> Delta Lake)

Step 1 -- Define event schema and Kafka topic.

kafka-topics.sh --create --bootstrap-server localhost:9092 \
  --topic user-events --partitions 12 --replication-factor 3 \
  --config retention.ms=604800000

Step 2 -- Implement Spark Structured Streaming.

events_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest").load()

parsed_df = events_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

aggregated_df = parsed_df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(window(col("event_timestamp"), "5 minutes"), col("event_type")) \
    .agg(count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users"))

aggregated_df.writeStream.format("delta").outputMode("append") \
    .option("checkpointLocation", "/checkpoints/user-events") \
    .trigger(processingTime="1 minute").start()

Step 3 -- Handle errors with dead letter queue.

def process_with_dlq(batch_df, batch_id):
    valid_df = batch_df.filter(col("event_id").isNotNull())
    invalid_df = batch_df.filter(col("event_id").isNull())
    valid_df.write.format("delta").mode("append").save("/data/lake/user_events")
    if invalid_df.count() > 0:
        invalid_df.withColumn("error_reason", lit("missing_event_id")) \
            .write.format("delta").mode("append").save("/data/lake/dlq/user_events")

Validation checkpoint: Consumer lag stays under threshold. DLQ table has < 0.1% of total events.


Workflow 3: Data Quality Framework

Step 1 -- Generate a Great Expectations suite from data.

python scripts/data_quality_validator.py generate-suite data.csv --output expectations.json

Step 2 -- Validate against a data contract.

# contracts/orders_contract.yaml
contract:
  name: orders_data_contract
  version: "1.0.0"
schema:
  properties:
    order_id: { type: string, format: uuid }
    total_amount: { type: decimal, minimum: 0 }
    status: { type: string, enum: [pending, confirmed, shipped, delivered, cancelled] }
sla:
  freshness: { max_delay_hours: 1 }
  completeness: { min_percentage: 99.9 }
  accuracy: { duplicate_tolerance: 0.01 }
python scripts/data_quality_validator.py contract data.csv --contract orders_contract.yaml --json

Step 3 -- Add dbt tests for ongoing validation.

models:
  - name: fct_orders
    columns:
      - name: order_id
        tests: [unique, not_null]
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range: { min_value: 0, max_value: 1000000 }

Validation checkpoint: Quality score >= 95%. Zero duplicates. Freshness under SLA threshold.


Architecture Decision Framework

| Question | Batch | Streaming | |----------|-------|-----------| | Latency requirement | Hours to days | Seconds to minutes | | Processing complexity | Complex transforms, ML | Simple aggregations | | Cost sensitivity | More cost-effective | Higher infra cost | | Error handling | Easy reprocessing | Requires careful DLQ design |

Decision tree:

Real-time insight needed?
  Yes -> Exactly-once needed?
    Yes -> Kafka + Flink/Spark Structured Streaming
    No  -> Kafka + consumer groups
  No  -> Daily volume > 1TB?
    Yes -> Spark/Databricks
    No  -> dbt + warehouse compute

| Feature | Warehouse (Snowflake/BigQuery) | Lakehouse (Delta/Iceberg) | |---------|-------------------------------|---------------------------| | Best for | BI, SQL analytics | ML, unstructured data | | Storage cost | Higher (proprietary) | Lower (open formats) | | Flexibility | Schema-on-write | Schema-on-read |


Anti-Patterns

  1. Full table reload on every run -- use incremental loads with watermark columns.
  2. No dead letter queue -- failed records silently dropped. Always route failures to a DLQ.
  3. Timezone mismatch -- normalize all timestamps to UTC at extraction.
  4. Missing freshness checks -- add dbt source freshness before transforms start.
  5. Skipping schema drift detection -- use mergeSchema option or data contracts to catch new columns.

Troubleshooting

| Problem | Cause | Solution | |---------|-------|----------| | Pipeline silently produces zero rows | Timezone mismatch on watermark column | Normalize to UTC; add row-count assertion | | Spark shuffle 10x slower than expected | Data skew on join key | Salt the key or broadcast the smaller table | | Airflow shows "no tasks to run" | Circular dependency or import error | airflow dags list-import-errors; fix import | | dbt succeeds but dashboards stale | Source freshness not checked | Add dbt source freshness as prerequisite task | | Kafka consumer lag grows unbounded | Throughput < producer rate | Increase partitions, scale consumers, batch max.poll.records | | Quality validator false-positive anomalies | Z-score threshold too tight | Raise threshold or switch to IQR mode |


References

| Guide | Path | |-------|------| | Pipeline Architecture | references/data_pipeline_architecture.md | | Data Modeling Patterns | references/data_modeling_patterns.md | | DataOps Best Practices | references/dataops_best_practices.md |


Integration Points

| Skill | Integration | |-------|-------------| | senior-data-scientist | Feature engineering consumes curated mart data | | senior-ml-engineer | ML pipelines depend on feature store tables | | senior-devops | CI/CD for dbt, Airflow deployment, container orchestration | | senior-architect | Architecture reviews for lakehouse vs warehouse decisions | | code-reviewer | Pipeline code reviews for DAGs, dbt models, Spark jobs |


Last Updated: April 2026 Version: 1.1.0