Agent Skills: Big Data & Distributed Computing

Apache Spark, Hadoop, distributed computing, and large-scale data processing for petabyte-scale workloads

apache-sparkhadoopdistributed-computingbig-datalarge-scale-data-processing
analyticsID: pluginagentmarketplace/custom-plugin-data-engineer/big-data

Skill Files

Browse the full folder contents for big-data.

Download Skill

Loading file tree…

skills/big-data/SKILL.md

Skill Metadata

Name
big-data
Description
Apache Spark, Hadoop, distributed computing, and large-scale data processing for petabyte-scale workloads

Big Data & Distributed Computing

Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.

Quick Start

# PySpark 3.5+ modern DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark with optimal settings
spark = (SparkSession.builder
    .appName("ProductionETL")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate())

# Efficient data loading with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", LongType(), False),
    StructField("event_type", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("properties", StringType(), True)
])

df = (spark.read
    .schema(schema)
    .parquet("s3://bucket/events/")
    .filter(F.col("timestamp") >= F.current_date() - 30))

# Complex aggregation with window functions
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

result = (df
    .withColumn("event_rank", F.row_number().over(window_spec))
    .withColumn("session_id", F.sum(
        F.when(
            F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"),
            1
        ).otherwise(0)
    ).over(window_spec))
    .groupBy("user_id", "session_id")
    .agg(
        F.count("*").alias("event_count"),
        F.min("timestamp").alias("session_start"),
        F.max("timestamp").alias("session_end")
    ))

result.write.mode("overwrite").parquet("s3://bucket/sessions/")

Core Concepts

1. Spark Architecture Deep Dive

┌─────────────────────────────────────────────────────────┐
│                    Driver Program                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │              SparkContext/SparkSession           │   │
│  │  - Creates execution plan (DAG)                  │   │
│  │  - Coordinates with Cluster Manager              │   │
│  │  - Schedules tasks                               │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│              Cluster Manager (YARN/K8s/Standalone)      │
└─────────────────────────────────────────────────────────┘
                           │
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Executor │    │ Executor │    │ Executor │
    │ ┌──────┐ │    │ ┌──────┐ │    │ ┌──────┐ │
    │ │Task 1│ │    │ │Task 2│ │    │ │Task 3│ │
    │ │Task 4│ │    │ │Task 5│ │    │ │Task 6│ │
    │ └──────┘ │    │ └──────┘ │    │ └──────┘ │
    │  Cache   │    │  Cache   │    │  Cache   │
    └──────────┘    └──────────┘    └──────────┘

2. Partition Optimization

from pyspark.sql import functions as F

# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Rule of thumb: 128MB per partition, 2-4 partitions per core
# For 100GB data on 10 executors with 4 cores each:
# 100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions
# Use: 200-400 partitions

# Repartition by key (for joins)
df_repartitioned = df.repartition(200, "user_id")

# Coalesce (reduce partitions without shuffle)
df_coalesced = df.coalesce(100)

# Optimal write partitioning
df.repartition(F.year("date"), F.month("date")) \
  .write \
  .partitionBy("year", "month") \
  .mode("overwrite") \
  .parquet("s3://bucket/output/")

# Bucketing for repeated joins
df.write \
  .bucketBy(256, "user_id") \
  .sortBy("user_id") \
  .saveAsTable("bucketed_events")

3. Join Optimization Strategies

from pyspark.sql import functions as F

# Broadcast join (small table < 10MB default, configurable to 100MB)
small_df = spark.read.parquet("s3://bucket/dim_product/")  # 5MB
large_df = spark.read.parquet("s3://bucket/fact_sales/")   # 500GB

# Explicit broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "product_id")

# Increase broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100MB

# Sort-Merge Join (for large tables)
# Both tables sorted and partitioned by join key
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id")
orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id")
result = users.join(orders, "user_id")

# Skewed join handling (salting technique)
# If user_id has skew (some users have millions of rows)
salt_range = 10
salted_users = (users
    .withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_range)])))
    .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))

salted_orders = (orders
    .withColumn("salt", (F.rand() * salt_range).cast("int"))
    .withColumn("salted_key", F.concat("user_id", F.lit("_"), "salt")))

result = salted_users.join(salted_orders, "salted_key").drop("salt", "salted_key")

4. Caching & Persistence

from pyspark import StorageLevel

# Caching strategies
df.cache()  # MEMORY_AND_DISK by default in Spark 3.x
df.persist(StorageLevel.MEMORY_ONLY)  # Fastest, may recompute if evicted
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # Compressed, slower but less memory
df.persist(StorageLevel.DISK_ONLY)  # For very large intermediate datasets

# When to cache:
# - Reused DataFrames (used in multiple actions)
# - After expensive transformations (joins, aggregations)
# - Before iterative algorithms

# Cache usage pattern
expensive_df = (spark.read.parquet("s3://bucket/large/")
    .filter(F.col("status") == "active")
    .join(broadcast(dim_df), "dim_key")
    .groupBy("category")
    .agg(F.sum("amount").alias("total")))

expensive_df.cache()
expensive_df.count()  # Materialize cache

# Use cached DataFrame multiple times
top_categories = expensive_df.orderBy(F.desc("total")).limit(10)
summary = expensive_df.agg(F.avg("total"), F.max("total"))

# Release cache when done
expensive_df.unpersist()

5. Structured Streaming

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Read from Kafka
kafka_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", 100000)
    .load())

# Parse JSON payload
event_schema = StructType([
    StructField("event_id", StringType()),
    StructField("user_id", LongType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType())
])

parsed_df = (kafka_df
    .select(F.from_json(F.col("value").cast("string"), event_schema).alias("data"))
    .select("data.*")
    .withWatermark("timestamp", "10 minutes"))

# Windowed aggregation
windowed_counts = (parsed_df
    .groupBy(
        F.window("timestamp", "5 minutes", "1 minute"),
        "event_type"
    )
    .count())

# Write to Delta Lake with checkpointing
query = (windowed_counts.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://bucket/checkpoints/events/")
    .trigger(processingTime="1 minute")
    .start("s3://bucket/streaming_output/"))

# Monitor stream
query.awaitTermination()

Tools & Technologies

| Tool | Purpose | Version (2025) | |------|---------|----------------| | Apache Spark | Distributed processing | 3.5+ | | Delta Lake | ACID transactions | 3.0+ | | Apache Iceberg | Table format | 1.4+ | | Apache Flink | Stream processing | 1.18+ | | Databricks | Managed Spark platform | Latest | | AWS EMR | Managed Hadoop/Spark | 7.0+ | | Trino | Interactive queries | 400+ | | dbt | Transform layer | 1.7+ |

Learning Path

Phase 1: Foundations (Weeks 1-3)

Week 1: Distributed computing concepts, MapReduce
Week 2: Spark architecture, RDDs, DataFrames
Week 3: Spark SQL, basic transformations

Phase 2: Intermediate (Weeks 4-6)

Week 4: Joins, aggregations, window functions
Week 5: Partitioning, bucketing, caching
Week 6: Performance tuning, Spark UI analysis

Phase 3: Advanced (Weeks 7-10)

Week 7: Structured Streaming
Week 8: Delta Lake / Iceberg table formats
Week 9: Cluster sizing, cost optimization
Week 10: Advanced optimizations (AQE, skew handling)

Phase 4: Production (Weeks 11-14)

Week 11: Deployment on EMR/Databricks
Week 12: Monitoring, alerting, debugging
Week 13: CI/CD for Spark jobs
Week 14: Multi-cluster architectures

Production Patterns

Delta Lake UPSERT (Merge)

from delta.tables import DeltaTable

# Incremental UPSERT pattern
delta_table = DeltaTable.forPath(spark, "s3://bucket/users/")
updates_df = spark.read.parquet("s3://bucket/updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdate(set={
    "email": "source.email",
    "updated_at": "source.updated_at"
}).whenNotMatchedInsertAll().execute()

# Optimize after merge
delta_table.optimize().executeCompaction()
delta_table.vacuum(retentionHours=168)  # 7 days

Cost-Effective Cluster Configuration

# spark-submit configuration for 1TB processing job
"""
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 50 \
    --executor-cores 4 \
    --executor-memory 16g \
    --driver-memory 8g \
    --conf spark.sql.adaptive.enabled=true \
    --conf spark.sql.adaptive.coalescePartitions.enabled=true \
    --conf spark.sql.shuffle.partitions=400 \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=10 \
    --conf spark.dynamicAllocation.maxExecutors=100 \
    --conf spark.speculation=true \
    job.py
"""

# Sizing guidelines:
# - Executor memory: 16-32GB (avoid GC overhead)
# - Executor cores: 4-5 (parallelism per executor)
# - Total cores: 2-4x data size in GB
# - Partitions: 2-4x total cores

Troubleshooting Guide

Common Failure Modes

| Issue | Symptoms | Root Cause | Fix | |-------|----------|------------|-----| | OOM Error | "Container killed by YARN" | Too much data per partition | Increase partitions, reduce broadcast | | Shuffle Spill | Slow stage, disk I/O | Insufficient memory | Increase spark.memory.fraction | | Skewed Tasks | One task much slower | Data skew on key | Use salting, AQE skew handling | | GC Overhead | "GC overhead limit exceeded" | Too many small objects | Use Kryo serialization, reduce UDFs | | Driver OOM | Driver crash | collect(), large broadcast | Avoid collect, stream results |

Debug Checklist

# 1. Check Spark UI (port 4040/18080)
# - Stages: Look for skewed tasks (max >> median)
# - Storage: Check cached data size
# - Environment: Verify configuration

# 2. Analyze execution plan
df.explain(mode="extended")

# 3. Check partition distribution
df.groupBy(F.spark_partition_id()).count().show()

# 4. Profile data skew
df.groupBy("key_column").count().orderBy(F.desc("count")).show(20)

# 5. Monitor job metrics
spark.sparkContext.setLogLevel("WARN")

# 6. Enable detailed metrics
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")

Reading Spark UI

Stage Analysis:
├── Duration: Total time for stage
├── Tasks: Number of parallel tasks
│   ├── Median: Typical task duration
│   ├── Max: Slowest task (check for skew)
│   └── Failed: Retry count
├── Input: Data read
├── Shuffle Read: Data from other stages
├── Shuffle Write: Data for downstream stages
└── Spill: Disk spill (indicates memory pressure)

Key Metrics:
├── GC Time > 10%: Memory issue
├── Shuffle Write > Input: Exploding join
├── Max/Median > 2x: Data skew
└── Spill > 0: Increase partitions or memory

Unit Test Template

import pytest
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
import pyspark.sql.functions as F

@pytest.fixture(scope="session")
def spark():
    """Session-scoped Spark for tests."""
    return (SparkSession.builder
        .master("local[2]")
        .appName("UnitTests")
        .config("spark.sql.shuffle.partitions", 2)
        .getOrCreate())

@pytest.fixture
def sample_data(spark):
    return spark.createDataFrame([
        (1, "user1", 100.0),
        (2, "user2", 200.0),
        (3, "user1", 150.0),
    ], ["id", "user_id", "amount"])

class TestAggregations:

    def test_user_totals(self, spark, sample_data):
        # Arrange
        expected = spark.createDataFrame([
            ("user1", 250.0),
            ("user2", 200.0),
        ], ["user_id", "total"])

        # Act
        result = sample_data.groupBy("user_id").agg(
            F.sum("amount").alias("total")
        )

        # Assert
        assert_df_equality(result, expected, ignore_row_order=True)

    def test_handles_empty_dataframe(self, spark):
        # Arrange
        empty_df = spark.createDataFrame([], "id INT, amount DOUBLE")

        # Act
        result = empty_df.agg(F.sum("amount").alias("total")).collect()

        # Assert
        assert result[0]["total"] is None

    def test_window_functions(self, spark, sample_data):
        # Arrange
        from pyspark.sql.window import Window
        window = Window.partitionBy("user_id").orderBy("id")

        # Act
        result = sample_data.withColumn(
            "running_total",
            F.sum("amount").over(window)
        ).filter(F.col("user_id") == "user1")

        # Assert
        totals = [row["running_total"] for row in result.collect()]
        assert totals == [100.0, 250.0]

Best Practices

Performance

# ✅ DO: Use DataFrame API over RDD
df.filter(F.col("status") == "active")  # Catalyst optimized

# ❌ DON'T: Use RDD transformations
rdd.filter(lambda x: x["status"] == "active")  # No optimization

# ✅ DO: Use built-in functions
df.withColumn("upper_name", F.upper("name"))

# ❌ DON'T: Use Python UDFs (slow serialization)
@udf
def upper_name(name):
    return name.upper()

# ✅ DO: Broadcast small lookups
df.join(broadcast(small_df), "key")

# ✅ DO: Persist wisely
intermediate.cache()
intermediate.count()  # Force materialization
# ... use intermediate multiple times ...
intermediate.unpersist()

Code Organization

# ✅ DO: Chain transformations fluently
result = (df
    .filter(condition)
    .withColumn("new_col", F.expr("..."))
    .groupBy("key")
    .agg(F.sum("value")))

# ✅ DO: Use descriptive column aliases
.agg(
    F.count("*").alias("event_count"),
    F.avg("amount").alias("avg_amount")
)

# ✅ DO: Parameterize for reusability
def add_date_features(df, date_col):
    return (df
        .withColumn("year", F.year(date_col))
        .withColumn("month", F.month(date_col))
        .withColumn("day_of_week", F.dayofweek(date_col)))

Resources

Official Documentation

Performance Tuning

Books

  • "Learning Spark 2nd Edition" by Damji et al.
  • "Spark: The Definitive Guide" by Chambers & Zaharia
  • "High Performance Spark" by Karau & Warren

Next Skills

After mastering Big Data:

  • data-warehousing - Design dimensional models
  • mlops - Deploy ML at scale
  • streaming - Real-time with Flink/Kafka
  • cloud-platforms - AWS EMR, Databricks

Skill Certification Checklist:

  • [ ] Can optimize Spark jobs using EXPLAIN and Spark UI
  • [ ] Can implement efficient joins with broadcast and bucketing
  • [ ] Can handle data skew with salting techniques
  • [ ] Can build streaming pipelines with Structured Streaming
  • [ ] Can use Delta Lake for ACID operations