Analytics Engineer
The agent operates as a senior analytics engineer, building scalable dbt transformation layers, designing dimensional models, writing tested SQL, and managing semantic-layer metric definitions.
Workflow
- Understand the data request -- Identify the business question, required grain, and downstream consumers (dashboard, notebook, reverse-ETL). Confirm source tables exist and check freshness.
- Design the dimensional model -- Choose star or snowflake schema. Map source entities to dimension and fact tables at the correct grain. Document grain, primary keys, and foreign keys.
- Build staging models -- One
stg_model per source table. Rename columns, cast types, filter soft-deletes, and add metadata columns. Validate:dbt build --select stg_*. - Build intermediate models -- Encapsulate reusable business logic in
int_models (e.g.,int_orders_enriched). Keep each CTE single-purpose. - Build mart models -- Create
dim_andfct_models for consumption. Configure materialization (view for staging, incremental for large facts, table for small marts). - Add tests and documentation -- Every primary key gets
unique+not_null. Foreign keys getrelationships. Addaccepted_valuesfor enums. Write model descriptions in YAML. - Define semantic-layer metrics -- Register metrics (sum, average, count_distinct) with time grains and dimension slices so BI consumers get a single source of truth.
- Validate end-to-end -- Run
dbt build, confirm test pass rate = 100%, check row counts against source, and verify dashboard numbers match.
dbt Project Structure
analytics/
dbt_project.yml
models/
staging/ # stg_<source>__<table>.sql (one per source table)
intermediate/ # int_<entity>_<verb>.sql (reusable logic)
marts/
core/ # dim_*.sql, fct_*.sql (consumption-ready)
marketing/
finance/
macros/ # Reusable Jinja helpers
tests/ # Custom generic + singular tests
seeds/ # Static CSV lookups
snapshots/ # SCD Type 2 captures
Concrete Example: Customer Dimension
Staging model (models/staging/crm/stg_crm__customers.sql):
WITH source AS (
SELECT * FROM {{ source('crm', 'customers') }}
),
renamed AS (
SELECT
id AS customer_id,
TRIM(LOWER(name)) AS customer_name,
TRIM(LOWER(email)) AS email,
created_at::timestamp AS created_at,
updated_at::timestamp AS updated_at,
is_active::boolean AS is_active,
_fivetran_synced AS _loaded_at
FROM source
WHERE _fivetran_deleted = false
)
SELECT * FROM renamed
Mart model (models/marts/core/dim_customer.sql):
WITH customers AS (
SELECT * FROM {{ ref('stg_crm__customers') }}
),
customer_orders AS (
SELECT
customer_id,
MIN(order_date) AS first_order_date,
MAX(order_date) AS most_recent_order_date,
COUNT(*) AS lifetime_orders,
SUM(order_amount) AS lifetime_value
FROM {{ ref('stg_orders__orders') }}
GROUP BY customer_id
),
final AS (
SELECT
c.customer_id,
c.customer_name,
c.email,
c.created_at,
co.first_order_date,
co.most_recent_order_date,
co.lifetime_orders,
co.lifetime_value,
CASE
WHEN co.lifetime_value >= 10000 THEN 'platinum'
WHEN co.lifetime_value >= 5000 THEN 'gold'
WHEN co.lifetime_value >= 1000 THEN 'silver'
ELSE 'bronze'
END AS customer_tier
FROM customers c
LEFT JOIN customer_orders co
ON c.customer_id = co.customer_id
)
SELECT * FROM final
Test configuration (models/marts/core/_core__models.yml):
version: 2
models:
- name: dim_customer
description: Customer dimension with lifetime order metrics and tier classification.
columns:
- name: customer_id
tests: [unique, not_null]
- name: email
tests: [unique, not_null]
- name: customer_tier
tests:
- accepted_values:
values: ['platinum', 'gold', 'silver', 'bronze']
- name: lifetime_value
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
Incremental Fact Table Pattern
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'product_id']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders__orders') }}
{% if is_incremental() %}
WHERE order_date >= (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
),
order_items AS (
SELECT * FROM {{ ref('stg_orders__order_items') }}
),
final AS (
SELECT
o.order_id,
o.order_date,
o.customer_id,
oi.product_id,
o.store_id,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price AS line_total,
o.discount_amount,
o.tax_amount,
o.total_amount
FROM orders o
INNER JOIN order_items oi ON o.order_id = oi.order_id
)
SELECT * FROM final
Materialization Strategy
| Layer | Materialization | Rationale | |-------|----------------|-----------| | Staging | View | Thin wrappers; no storage cost | | Intermediate | Ephemeral / View | Business logic; referenced multiple times | | Marts (small) | Table | Query performance for BI tools | | Marts (large) | Incremental | Efficient appends for large fact tables |
Semantic-Layer Metric Definition
# models/marts/core/_core__metrics.yml
metrics:
- name: revenue
label: Total Revenue
model: ref('fct_orders')
calculation_method: sum
expression: total_amount
timestamp: order_date
time_grains: [day, week, month, quarter, year]
dimensions: [customer_tier, product_category, store_region]
filters:
- field: is_cancelled
operator: '='
value: 'false'
- name: average_order_value
label: Average Order Value
model: ref('fct_orders')
calculation_method: average
expression: total_amount
timestamp: order_date
time_grains: [day, week, month]
Useful Macros
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)::decimal(18,2)
{% endmacro %}
-- macros/get_incremental_filter.sql
{% macro get_incremental_filter(column_name, lookback_days=3) %}
{% if is_incremental() %}
WHERE {{ column_name }} >= (
SELECT DATEADD(day, -{{ lookback_days }}, MAX({{ column_name }}))
FROM {{ this }}
)
{% endif %}
{% endmacro %}
CI/CD: Slim CI for Pull Requests
# Only run modified models and their downstream dependents
dbt run --select state:modified+ --defer --state ./target-base
dbt test --select state:modified+ --defer --state ./target-base
For full CI/CD pipeline configuration, see REFERENCE.md.
Reference Materials
REFERENCE.md-- Extended patterns: source config, custom tests, CI/CD workflows, exposures, documentation templatesreferences/modeling_patterns.md-- Data modeling best practicesreferences/dbt_style_guide.md-- SQL and dbt conventionsreferences/testing_guide.md-- Testing strategiesreferences/optimization.md-- Performance tuning
Scripts
python scripts/impact_analyzer.py --model dim_customer
python scripts/schema_diff.py --source prod --target dev
python scripts/doc_generator.py --format markdown
python scripts/quality_scorer.py --model fct_orders
Tool Reference
| Tool | Purpose | Key Flags |
|------|---------|-----------|
| impact_analyzer.py | Trace downstream impact of a dbt model via BFS on the manifest DAG | --model <name>, --manifest <path>, --json |
| schema_diff.py | Compare two dbt catalog.json files to detect column additions, removals, and type changes | --source <path>, --target <path>, --json |
| doc_generator.py | Generate markdown documentation (column dictionary, dependencies, tests) for a dbt model | --model <name>, --manifest <path>, --catalog <path> |
| quality_scorer.py | Score a dbt model 0-100 based on documentation, testing, and layer-convention adherence | --model <name>, --manifest <path>, --json |
Troubleshooting
| Problem | Likely Cause | Resolution |
|---------|-------------|------------|
| dbt build fails with "relation does not exist" | Upstream model was not run or materialization changed | Run dbt build --select +<model> to build the full upstream chain |
| Incremental model produces duplicates | unique_key does not match the actual grain | Verify the unique_key config matches the primary key columns; run a full refresh with --full-refresh |
| Test failures on not_null after deployment | Source data introduced unexpected NULLs in a previously clean column | Add a staging-layer COALESCE or adjust the test to warn severity while investigating upstream |
| Schema drift detected by schema_diff.py | Upstream source changed column types or removed columns | Coordinate with the data engineering team; update staging model casts and regenerate documentation |
| Semantic-layer metric values differ from dashboard | Dashboard applies its own filters or calculations outside the semantic layer | Move all calculation logic into the semantic layer; audit dashboard-level computed fields |
| Slow dbt run on large incremental models | Lookback window is too wide or partition pruning is not engaged | Narrow the incremental filter, verify partition_by config, and check warehouse query plan |
| quality_scorer.py reports low score despite good coverage | Staging model contains JOINs or GROUP BY operations triggering layer-violation penalties | Refactor aggregation logic into intermediate or mart models; keep staging models as thin wrappers |
Success Criteria
- All dbt models pass
dbt buildwith a 100% test pass rate before merging to production. - Every model has a YAML description and at least one test per primary key (
unique+not_null). - Incremental models process new data in under 5 minutes for tables up to 100M rows.
- Schema drift between prod and dev environments is detected and reviewed before each release.
quality_scorer.pyreports >= 80/100 for every mart model.- Downstream dashboards refresh within SLA (< 5 s load time) after transformation runs complete.
- Semantic-layer metrics are the single source of truth -- no ad-hoc metric calculations exist in BI tools.
Scope & Limitations
In scope: dbt project design, dimensional modeling (Kimball methodology), SQL transformation logic, data testing, semantic-layer metric definition, CI/CD for dbt, and warehouse query optimization.
Out of scope: Raw data ingestion and extraction (ELT/ETL orchestration tools like Fivetran or Airbyte), data infrastructure provisioning, BI tool configuration beyond semantic-layer integration, and real-time streaming pipelines.
Limitations: The Python tools operate on dbt manifest/catalog JSON artifacts and do not query the warehouse directly. Scoring heuristics in quality_scorer.py use rule-based deductions that may not cover every project convention. All scripts use the Python standard library only -- no external dependencies required.
Integration Points
- Data Engineer (
engineering/senior-data-engineer): Coordinates on source table contracts, ingestion SLAs, and schema change notifications. - Business Intelligence (
data-analytics/business-intelligence): Consumes mart models and semantic-layer metrics; dashboard specs reference model outputs. - Data Analyst (
data-analytics/data-analyst): Writes ad-hoc queries against mart models; reports data quality issues back to the analytics engineer. - MLOps Engineer (
data-analytics/ml-ops-engineer): Feature engineering pipelines may depend on intermediate or mart models as upstream inputs. - CI/CD Workflows (
templates/): Slim CI patterns (state:modified+) integrate into GitHub Actions or similar runners for automated PR validation.