Agent Skills: Python Programming for Data Engineering

Master Python fundamentals, OOP, data structures, async programming, and production-grade scripting for data engineering

pythonobject-oriented-programmingdata-structuresasynciodata-engineering
developmentID: pluginagentmarketplace/custom-plugin-data-engineer/python-programming

Skill Files

Browse the full folder contents for python-programming.

Download Skill

Loading file tree…

skills/python-programming/SKILL.md

Skill Metadata

Name
python-programming
Description
Master Python fundamentals, OOP, data structures, async programming, and production-grade scripting for data engineering

Python Programming for Data Engineering

Production-grade Python development for building scalable data pipelines, ETL systems, and data-intensive applications.

Quick Start

# Modern Python 3.12+ data engineering setup
from dataclasses import dataclass
from typing import Generator
from collections.abc import Iterator
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class DataRecord:
    """Type-safe data container with validation."""
    id: int
    value: float
    category: str

    def __post_init__(self):
        if self.value < 0:
            raise ValueError(f"Value must be non-negative, got {self.value}")

def process_records(records: Iterator[dict]) -> Generator[DataRecord, None, None]:
    """Memory-efficient generator for processing large datasets."""
    for idx, record in enumerate(records):
        try:
            yield DataRecord(
                id=record['id'],
                value=float(record['value']),
                category=record.get('category', 'unknown')
            )
        except (KeyError, ValueError) as e:
            logger.warning(f"Skipping invalid record {idx}: {e}")
            continue

# Usage
if __name__ == "__main__":
    sample_data = [{"id": 1, "value": "100.5", "category": "A"}]
    for record in process_records(iter(sample_data)):
        logger.info(f"Processed: {record}")

Core Concepts

1. Type-Safe Data Structures (2024-2025 Standard)

from typing import TypedDict, NotRequired, Literal
from dataclasses import dataclass, field
from datetime import datetime

# TypedDict for JSON-like structures
class PipelineConfig(TypedDict):
    source: str
    destination: str
    batch_size: int
    retry_count: NotRequired[int]
    mode: Literal["batch", "streaming"]

# Dataclass for domain objects
@dataclass(frozen=True, slots=True)
class ETLJob:
    """Immutable, memory-efficient job definition."""
    job_id: str
    created_at: datetime = field(default_factory=datetime.utcnow)
    config: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        return {"job_id": self.job_id, "created_at": self.created_at.isoformat()}

2. Generator Patterns for Large Data

from typing import Generator, Iterable
import csv
from pathlib import Path

def read_csv_chunks(
    file_path: Path,
    chunk_size: int = 10000
) -> Generator[list[dict], None, None]:
    """
    Memory-efficient CSV reader using generators.
    Processes files of any size without loading into memory.
    """
    with open(file_path, 'r', newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        chunk = []
        for row in reader:
            chunk.append(row)
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        if chunk:  # Don't forget the last chunk
            yield chunk

def transform_pipeline(
    records: Iterable[dict],
    transformers: list[callable]
) -> Generator[dict, None, None]:
    """Composable transformation pipeline."""
    for record in records:
        result = record
        for transform in transformers:
            result = transform(result)
            if result is None:
                break
        if result is not None:
            yield result

3. Async Programming for I/O-Bound Tasks

import asyncio
import aiohttp
from typing import AsyncGenerator
import logging

logger = logging.getLogger(__name__)

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3,
    backoff_factor: float = 2.0
) -> dict | None:
    """
    Fetch URL with exponential backoff retry logic.
    Production pattern for API data ingestion.
    """
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                resp.raise_for_status()
                return await resp.json()
        except aiohttp.ClientError as e:
            wait_time = backoff_factor ** attempt
            logger.warning(f"Attempt {attempt+1} failed for {url}: {e}. Retrying in {wait_time}s")
            await asyncio.sleep(wait_time)
    logger.error(f"All retries exhausted for {url}")
    return None

async def fetch_all_pages(
    base_url: str,
    page_count: int,
    concurrency_limit: int = 10
) -> AsyncGenerator[dict, None]:
    """Concurrent API fetching with rate limiting."""
    semaphore = asyncio.Semaphore(concurrency_limit)

    async def bounded_fetch(session: aiohttp.ClientSession, url: str):
        async with semaphore:
            return await fetch_with_retry(session, url)

    async with aiohttp.ClientSession() as session:
        tasks = [bounded_fetch(session, f"{base_url}?page={i}") for i in range(page_count)]
        for result in asyncio.as_completed(tasks):
            data = await result
            if data:
                yield data

4. Error Handling & Observability

import functools
import time
import logging
from typing import TypeVar, Callable, ParamSpec

P = ParamSpec('P')
R = TypeVar('R')

def with_retry(
    max_attempts: int = 3,
    exceptions: tuple = (Exception,),
    backoff_factor: float = 2.0
) -> Callable[[Callable[P, R]], Callable[P, R]]:
    """
    Decorator for automatic retry with exponential backoff.
    Use for flaky operations (network, database connections).
    """
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    wait_time = backoff_factor ** attempt
                    logging.warning(
                        f"{func.__name__} attempt {attempt+1} failed: {e}. "
                        f"Retrying in {wait_time}s"
                    )
                    time.sleep(wait_time)
            raise last_exception
        return wrapper
    return decorator

def log_execution_time(func: Callable[P, R]) -> Callable[P, R]:
    """Decorator for performance monitoring."""
    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        start = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            duration = time.perf_counter() - start
            logging.info(f"{func.__name__} completed in {duration:.3f}s")
            return result
        except Exception as e:
            duration = time.perf_counter() - start
            logging.error(f"{func.__name__} failed after {duration:.3f}s: {e}")
            raise
    return wrapper

Tools & Technologies

| Tool | Purpose | Version (2025) | |------|---------|----------------| | Python | Core language | 3.12+ | | uv | Package manager (replaces pip) | 0.4+ | | Ruff | Linter + formatter (replaces Black, flake8) | 0.5+ | | mypy | Static type checking | 1.11+ | | pytest | Testing framework | 8.0+ | | pydantic | Data validation | 2.5+ | | polars | DataFrame operations (faster than pandas) | 0.20+ | | httpx | Modern HTTP client | 0.27+ |

Learning Path

Phase 1: Foundations (Weeks 1-3)

Week 1: Core syntax, data types, control flow
Week 2: Functions, modules, file I/O
Week 3: OOP (classes, inheritance, composition)

Phase 2: Intermediate (Weeks 4-6)

Week 4: Generators, iterators, decorators
Week 5: Type hints, dataclasses, protocols
Week 6: Error handling, logging, testing basics

Phase 3: Advanced (Weeks 7-9)

Week 7: Async/await, concurrent programming
Week 8: Memory optimization, profiling
Week 9: Package structure, dependency management

Phase 4: Production Mastery (Weeks 10-12)

Week 10: CI/CD integration, linting, formatting
Week 11: Performance optimization patterns
Week 12: Production deployment patterns

Production Patterns

Configuration Management

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    """Type-safe configuration with environment variable support."""
    database_url: str
    api_key: str
    batch_size: int = 1000
    debug: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache
def get_settings() -> Settings:
    """Cached settings singleton."""
    return Settings()

Connection Pooling

from contextlib import contextmanager
from typing import Generator
import psycopg2
from psycopg2 import pool

class DatabasePool:
    """Thread-safe connection pool for PostgreSQL."""

    def __init__(self, dsn: str, min_conn: int = 2, max_conn: int = 10):
        self._pool = pool.ThreadedConnectionPool(min_conn, max_conn, dsn)

    @contextmanager
    def get_connection(self) -> Generator:
        conn = self._pool.getconn()
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            self._pool.putconn(conn)

    def close(self):
        self._pool.closeall()

Troubleshooting Guide

Common Failure Modes

| Issue | Symptoms | Root Cause | Fix | |-------|----------|------------|-----| | Memory Error | MemoryError, process killed | Loading full dataset into memory | Use generators, chunked processing | | Import Error | ModuleNotFoundError | Virtual env not activated, missing dep | uv pip install, check sys.path | | Type Error | TypeError: unhashable type | Using mutable as dict key | Convert to tuple or use dataclass | | Async Deadlock | Program hangs | Blocking call in async code | Use asyncio.to_thread() for blocking ops | | GIL Bottleneck | CPU-bound parallelism slow | Python GIL limits threads | Use multiprocessing or ProcessPoolExecutor |

Debug Checklist

# 1. Check Python version
python --version  # Should be 3.12+

# 2. Verify virtual environment
which python  # Should point to venv

# 3. Check installed packages
uv pip list | grep <package>

# 4. Run with verbose logging
python -m mymodule -v 2>&1 | tee debug.log

# 5. Profile memory usage
python -m memory_profiler script.py

# 6. Profile CPU
python -m cProfile -s cumtime script.py

Log Interpretation

# Structured logging for easier debugging
import structlog

logger = structlog.get_logger()

def process_batch(batch_id: str, records: list):
    logger.info("batch_started", batch_id=batch_id, record_count=len(records))
    try:
        # processing...
        logger.info("batch_completed", batch_id=batch_id, success=True)
    except Exception as e:
        logger.error("batch_failed", batch_id=batch_id, error=str(e), exc_info=True)
        raise

Unit Test Template

import pytest
from unittest.mock import Mock, patch
from your_module import process_records, DataRecord

class TestProcessRecords:
    """Unit tests following AAA pattern (Arrange-Act-Assert)."""

    def test_valid_records_processed(self):
        # Arrange
        input_data = [{"id": 1, "value": "10.5", "category": "A"}]

        # Act
        result = list(process_records(iter(input_data)))

        # Assert
        assert len(result) == 1
        assert result[0].id == 1
        assert result[0].value == 10.5

    def test_invalid_records_skipped(self):
        # Arrange
        input_data = [{"id": 1}]  # Missing 'value'

        # Act
        result = list(process_records(iter(input_data)))

        # Assert
        assert len(result) == 0

    def test_negative_value_raises_error(self):
        # Arrange & Act & Assert
        with pytest.raises(ValueError, match="non-negative"):
            DataRecord(id=1, value=-5.0, category="A")

    @patch('your_module.external_api_call')
    def test_with_mocked_dependency(self, mock_api):
        # Arrange
        mock_api.return_value = {"status": "ok"}

        # Act
        result = function_using_api()

        # Assert
        mock_api.assert_called_once()
        assert result["status"] == "ok"

Best Practices

Code Style (2025 Standards)

# ✅ DO: Use type hints everywhere
def calculate_metrics(data: list[float]) -> dict[str, float]: ...

# ✅ DO: Prefer composition over inheritance
@dataclass
class Pipeline:
    reader: DataReader
    transformer: Transformer
    writer: DataWriter

# ✅ DO: Use context managers for resources
with open_connection() as conn:
    process(conn)

# ❌ DON'T: Use bare except
try: ...
except: pass  # Never do this

# ❌ DON'T: Mutate function arguments
def process(items: list) -> list:
    items.append("new")  # Avoid this
    return items.copy()  # Return new list instead

Performance Tips

# ✅ Use generators for large data
def process_large_file(path):
    with open(path) as f:
        for line in f:  # Memory efficient
            yield transform(line)

# ✅ Use set/dict for O(1) lookups
valid_ids = set(load_valid_ids())  # Not list
if item_id in valid_ids: ...

# ✅ Use local variables in hot loops
def hot_loop(items):
    local_func = expensive_lookup  # Cache reference
    for item in items:
        local_func(item)

Resources

Official Documentation

Production References

Community

Next Skills

After mastering Python programming:

  • sql-databases - Query and manage relational data
  • etl-tools - Build data pipelines with Airflow
  • big-data - Scale with Spark and distributed systems
  • machine-learning - Apply ML with scikit-learn

Skill Certification Checklist:

  • [ ] Can write type-safe Python with mypy validation
  • [ ] Can implement generators for large data processing
  • [ ] Can use async/await for concurrent I/O
  • [ ] Can write comprehensive unit tests with pytest
  • [ ] Can profile and optimize Python performance