Senior Data Engineer
The agent generates pipeline configurations (Airflow, Prefect, Dagster), validates data quality with profiling and anomaly detection, and optimizes SQL/Spark performance with actionable recommendations.
Quick Start
# Generate an Airflow DAG for incremental PostgreSQL -> Snowflake
python scripts/pipeline_orchestrator.py generate \
--type airflow --source postgres --destination snowflake \
--tables orders,customers --mode incremental --schedule "0 5 * * *"
# Validate data quality against a schema
python scripts/data_quality_validator.py validate data.csv \
--schema schema.json --detect-anomalies --json
# Profile a dataset
python scripts/data_quality_validator.py profile data.csv --json
# Optimize a slow SQL query
python scripts/etl_performance_optimizer.py analyze-sql query.sql \
--warehouse snowflake --json
# Estimate query cost
python scripts/etl_performance_optimizer.py estimate-cost query.sql \
--warehouse bigquery --stats data_stats.json --json
Tools Overview
| Tool | Subcommands | Purpose |
|------|-------------|---------|
| pipeline_orchestrator.py | generate, validate, template | Generate Airflow/Prefect/Dagster pipeline code, validate DAGs |
| data_quality_validator.py | validate, profile, generate-suite, contract, schema | Schema validation, profiling, anomaly detection, Great Expectations |
| etl_performance_optimizer.py | analyze-sql, analyze-spark, optimize-partition, estimate-cost, template | SQL/Spark optimization, partition strategy, cost estimation |
All subcommands support --json for machine-readable output and --output for file writing.
Workflow 1: Batch ETL Pipeline (PostgreSQL -> dbt -> Snowflake)
Step 1 -- Generate extraction config.
python scripts/pipeline_orchestrator.py generate \
--type airflow --source postgres --tables orders,customers,products \
--mode incremental --watermark updated_at --output dags/extract_source.py
Step 2 -- Create dbt staging model.
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
)
SELECT order_id, customer_id, order_date, total_amount, status, _extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
Step 3 -- Create incremental mart model.
-- models/marts/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id', cluster_by=['order_date']) }}
SELECT o.order_id, o.customer_id, c.customer_segment, o.order_date, o.total_amount, o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}
Step 4 -- Wire into Airflow DAG.
with DAG('daily_etl', schedule_interval='0 5 * * *', catchup=False, tags=['etl']) as dag:
extract = BashOperator(task_id='extract', bash_command='python scripts/extract.py --date {{ ds }}')
transform = BashOperator(task_id='dbt_run', bash_command='dbt run --select marts.*')
test = BashOperator(task_id='dbt_test', bash_command='dbt test --select marts.*')
extract >> transform >> test
Step 5 -- Validate.
python scripts/data_quality_validator.py validate --table fct_orders --checks all --output report.json
Validation checkpoint: DAG runs end-to-end. Data quality report shows 0 failures on uniqueness, completeness, and freshness.
Workflow 2: Real-Time Streaming (Kafka -> Spark -> Delta Lake)
Step 1 -- Define event schema and Kafka topic.
kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic user-events --partitions 12 --replication-factor 3 \
--config retention.ms=604800000
Step 2 -- Implement Spark Structured Streaming.
events_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest").load()
parsed_df = events_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
aggregated_df = parsed_df \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(window(col("event_timestamp"), "5 minutes"), col("event_type")) \
.agg(count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users"))
aggregated_df.writeStream.format("delta").outputMode("append") \
.option("checkpointLocation", "/checkpoints/user-events") \
.trigger(processingTime="1 minute").start()
Step 3 -- Handle errors with dead letter queue.
def process_with_dlq(batch_df, batch_id):
valid_df = batch_df.filter(col("event_id").isNotNull())
invalid_df = batch_df.filter(col("event_id").isNull())
valid_df.write.format("delta").mode("append").save("/data/lake/user_events")
if invalid_df.count() > 0:
invalid_df.withColumn("error_reason", lit("missing_event_id")) \
.write.format("delta").mode("append").save("/data/lake/dlq/user_events")
Validation checkpoint: Consumer lag stays under threshold. DLQ table has < 0.1% of total events.
Workflow 3: Data Quality Framework
Step 1 -- Generate a Great Expectations suite from data.
python scripts/data_quality_validator.py generate-suite data.csv --output expectations.json
Step 2 -- Validate against a data contract.
# contracts/orders_contract.yaml
contract:
name: orders_data_contract
version: "1.0.0"
schema:
properties:
order_id: { type: string, format: uuid }
total_amount: { type: decimal, minimum: 0 }
status: { type: string, enum: [pending, confirmed, shipped, delivered, cancelled] }
sla:
freshness: { max_delay_hours: 1 }
completeness: { min_percentage: 99.9 }
accuracy: { duplicate_tolerance: 0.01 }
python scripts/data_quality_validator.py contract data.csv --contract orders_contract.yaml --json
Step 3 -- Add dbt tests for ongoing validation.
models:
- name: fct_orders
columns:
- name: order_id
tests: [unique, not_null]
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range: { min_value: 0, max_value: 1000000 }
Validation checkpoint: Quality score >= 95%. Zero duplicates. Freshness under SLA threshold.
Architecture Decision Framework
| Question | Batch | Streaming | |----------|-------|-----------| | Latency requirement | Hours to days | Seconds to minutes | | Processing complexity | Complex transforms, ML | Simple aggregations | | Cost sensitivity | More cost-effective | Higher infra cost | | Error handling | Easy reprocessing | Requires careful DLQ design |
Decision tree:
Real-time insight needed?
Yes -> Exactly-once needed?
Yes -> Kafka + Flink/Spark Structured Streaming
No -> Kafka + consumer groups
No -> Daily volume > 1TB?
Yes -> Spark/Databricks
No -> dbt + warehouse compute
| Feature | Warehouse (Snowflake/BigQuery) | Lakehouse (Delta/Iceberg) | |---------|-------------------------------|---------------------------| | Best for | BI, SQL analytics | ML, unstructured data | | Storage cost | Higher (proprietary) | Lower (open formats) | | Flexibility | Schema-on-write | Schema-on-read |
Anti-Patterns
- Full table reload on every run -- use incremental loads with watermark columns.
- No dead letter queue -- failed records silently dropped. Always route failures to a DLQ.
- Timezone mismatch -- normalize all timestamps to UTC at extraction.
- Missing freshness checks -- add
dbt source freshnessbefore transforms start. - Skipping schema drift detection -- use
mergeSchemaoption or data contracts to catch new columns.
Troubleshooting
| Problem | Cause | Solution |
|---------|-------|----------|
| Pipeline silently produces zero rows | Timezone mismatch on watermark column | Normalize to UTC; add row-count assertion |
| Spark shuffle 10x slower than expected | Data skew on join key | Salt the key or broadcast the smaller table |
| Airflow shows "no tasks to run" | Circular dependency or import error | airflow dags list-import-errors; fix import |
| dbt succeeds but dashboards stale | Source freshness not checked | Add dbt source freshness as prerequisite task |
| Kafka consumer lag grows unbounded | Throughput < producer rate | Increase partitions, scale consumers, batch max.poll.records |
| Quality validator false-positive anomalies | Z-score threshold too tight | Raise threshold or switch to IQR mode |
References
| Guide | Path |
|-------|------|
| Pipeline Architecture | references/data_pipeline_architecture.md |
| Data Modeling Patterns | references/data_modeling_patterns.md |
| DataOps Best Practices | references/dataops_best_practices.md |
Integration Points
| Skill | Integration |
|-------|-------------|
| senior-data-scientist | Feature engineering consumes curated mart data |
| senior-ml-engineer | ML pipelines depend on feature store tables |
| senior-devops | CI/CD for dbt, Airflow deployment, container orchestration |
| senior-architect | Architecture reviews for lakehouse vs warehouse decisions |
| code-reviewer | Pipeline code reviews for DAGs, dbt models, Spark jobs |
Last Updated: April 2026 Version: 1.1.0