Agent Skills: Databricks Core Workflow A: Delta Lake ETL

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/databricks-core-workflow-a

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/databricks-pack/skills/databricks-core-workflow-a

Skill Files

Browse the full folder contents for databricks-core-workflow-a.

Download Skill

Loading file tree…

plugins/saas-packs/databricks-pack/skills/databricks-core-workflow-a/SKILL.md

Skill Metadata

Name
databricks-core-workflow-a
Description
|

Databricks Core Workflow A: Delta Lake ETL

Overview

Build production Delta Lake ETL pipelines using the medallion architecture (Bronze > Silver > Gold). Uses Auto Loader (cloudFiles) for incremental ingestion, MERGE INTO for upserts, and Delta Live Tables for declarative pipelines.

Prerequisites

  • Completed databricks-install-auth setup
  • Unity Catalog enabled with catalogs/schemas created
  • Access to cloud storage for raw data (S3, ADLS, GCS)

Architecture

Raw Sources (S3/ADLS/GCS)
    │  Auto Loader (cloudFiles)
    ▼
Bronze (raw + metadata)
    │  Cleanse, deduplicate, type-cast
    ▼
Silver (conformed)
    │  Aggregate, join, feature engineer
    ▼
Gold (analytics-ready)

Instructions

Step 1: Bronze Layer — Raw Ingestion with Auto Loader

Auto Loader (cloudFiles format) incrementally processes new files as they arrive. It handles schema inference, evolution, and scales to millions of files.

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit

spark = SparkSession.builder.getOrCreate()

# Streaming ingestion with Auto Loader
bronze_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoints/bronze/orders/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://data-lake/raw/orders/")
)

# Add ingestion metadata
bronze_with_meta = (
    bronze_stream
    .withColumn("_ingested_at", current_timestamp())
    .withColumn("_source_file", input_file_name())
    .withColumn("_source_system", lit("orders-api"))
)

# Write to bronze Delta table
(bronze_with_meta.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/bronze/orders/data")
    .option("mergeSchema", "true")
    .toTable("prod_catalog.bronze.raw_orders"))

Step 2: Silver Layer — Cleansing and Deduplication

Read from Bronze, apply business logic, and MERGE INTO Silver with upsert semantics.

from pyspark.sql.functions import col, trim, lower, to_timestamp, sha2, concat_ws
from delta.tables import DeltaTable

# Read new records from bronze (batch mode for scheduled jobs)
bronze_df = spark.table("prod_catalog.bronze.raw_orders")

# Apply transformations
silver_df = (
    bronze_df
    .withColumn("order_id", col("order_id").cast("string"))
    .withColumn("customer_email", lower(trim(col("customer_email"))))
    .withColumn("order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss"))
    .withColumn("amount", col("amount").cast("decimal(12,2)"))
    .withColumn("email_hash", sha2(col("customer_email"), 256))
    .filter(col("order_id").isNotNull())
    .dropDuplicates(["order_id"])
)

# Upsert into silver with MERGE
if spark.catalog.tableExists("prod_catalog.silver.orders"):
    target = DeltaTable.forName(spark, "prod_catalog.silver.orders")
    (target.alias("t")
        .merge(silver_df.alias("s"), "t.order_id = s.order_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())
else:
    silver_df.write.format("delta").saveAsTable("prod_catalog.silver.orders")

Step 3: Gold Layer — Business Aggregations

Aggregate Silver data into analytics-ready tables. Use partition-level overwrites for efficient updates.

from pyspark.sql.functions import sum as _sum, count, avg, date_trunc

# Daily order metrics
gold_metrics = (
    spark.table("prod_catalog.silver.orders")
    .withColumn("order_day", date_trunc("day", col("order_date")))
    .groupBy("order_day", "region")
    .agg(
        count("order_id").alias("total_orders"),
        _sum("amount").alias("total_revenue"),
        avg("amount").alias("avg_order_value"),
    )
)

# Overwrite only changed partitions
(gold_metrics.write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"order_day >= '{target_date}'")
    .saveAsTable("prod_catalog.gold.daily_order_metrics"))

Step 4: Delta Table Maintenance

-- Compact small files (bin-packing)
OPTIMIZE prod_catalog.silver.orders;

-- Z-order for query performance on frequently filtered columns
OPTIMIZE prod_catalog.silver.orders ZORDER BY (order_date, region);

-- Or use Liquid Clustering (DBR 13.3+) — replaces partitioning + Z-order
ALTER TABLE prod_catalog.silver.orders CLUSTER BY (order_date, region);
OPTIMIZE prod_catalog.silver.orders;

-- Clean up old file versions (default: 7 days)
VACUUM prod_catalog.silver.orders RETAIN 168 HOURS;

-- Compute statistics for query optimizer
ANALYZE TABLE prod_catalog.silver.orders COMPUTE STATISTICS;

Step 5: Delta Live Tables (Declarative Pipeline)

DLT manages orchestration, data quality, lineage, and error handling automatically.

import dlt
from pyspark.sql.functions import col, current_timestamp

@dlt.table(
    comment="Raw orders from Auto Loader",
    table_properties={"quality": "bronze"},
)
def bronze_orders():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("s3://data-lake/raw/orders/")
        .withColumn("_ingested_at", current_timestamp())
    )

@dlt.table(comment="Cleansed orders")
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
def silver_orders():
    return (
        dlt.read_stream("bronze_orders")
        .withColumn("amount", col("amount").cast("decimal(12,2)"))
        .dropDuplicates(["order_id"])
    )

@dlt.table(comment="Daily revenue metrics")
def gold_daily_revenue():
    return (
        dlt.read("silver_orders")
        .groupBy("region", "order_date")
        .agg({"amount": "sum", "order_id": "count"})
    )

Step 6: Schedule the Pipeline

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
    CreateJob, Task, NotebookTask, JobCluster, CronSchedule,
)
from databricks.sdk.service.compute import ClusterSpec, AutoScale

w = WorkspaceClient()

job = w.jobs.create(
    name="daily-orders-etl",
    tasks=[
        Task(task_key="bronze", job_cluster_key="etl",
             notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/bronze")),
        Task(task_key="silver", job_cluster_key="etl",
             notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/silver"),
             depends_on=[{"task_key": "bronze"}]),
        Task(task_key="gold", job_cluster_key="etl",
             notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/gold"),
             depends_on=[{"task_key": "silver"}]),
    ],
    job_clusters=[JobCluster(
        job_cluster_key="etl",
        new_cluster=ClusterSpec(
            spark_version="14.3.x-scala2.12",
            node_type_id="i3.xlarge",
            autoscale=AutoScale(min_workers=1, max_workers=4),
        ),
    )],
    schedule=CronSchedule(quartz_cron_expression="0 0 6 * * ?", timezone_id="UTC"),
    max_concurrent_runs=1,
)
print(f"Created job: {job.job_id}")

Output

  • Bronze layer with raw data, Auto Loader schema evolution, and ingestion metadata
  • Silver layer with cleansed, deduplicated, type-cast data via MERGE upserts
  • Gold layer with business-ready aggregations
  • Table maintenance schedule (OPTIMIZE, VACUUM, ANALYZE)
  • Optional DLT pipeline with built-in data quality expectations

Error Handling

| Error | Cause | Solution | |-------|-------|----------| | AnalysisException: mergeSchema | Source schema changed | Auto Loader handles this; for batch add .option("mergeSchema", "true") | | ConcurrentAppendException | Multiple jobs writing same table | Use MERGE with retry logic or serialize writes via max_concurrent_runs=1 | | Null primary key | Bad source data | Add @dlt.expect_or_drop or .filter(col("pk").isNotNull()) | | java.lang.OutOfMemoryError | Driver collecting large results | Never call .collect() on large data; use .write to keep distributed | | VACUUM below retention | Retention < 7 days | Set delta.deletedFileRetentionDuration = '168 hours' minimum |

Examples

Quick Pipeline Validation

-- Verify row counts flow through medallion layers
SELECT 'bronze' AS layer, COUNT(*) AS rows FROM prod_catalog.bronze.raw_orders
UNION ALL SELECT 'silver', COUNT(*) FROM prod_catalog.silver.orders
UNION ALL SELECT 'gold', COUNT(*) FROM prod_catalog.gold.daily_order_metrics;

Resources

Next Steps

For ML workflows, see databricks-core-workflow-b.