Agent Skills: Python data pipeline development

Python data processing pipelines with modular architecture. Use when building content processing workflows, implementing dispatcher patterns, integrating Google Sheets/Drive APIs, or creating batch processing systems. Covers patterns from rosen-scraper, image-analyzer, and social-scraper projects.

UncategorizedID: jamditis/claude-skills-journalism/python-pipeline

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jamditis/claude-skills-journalism/tree/HEAD/dev-toolkit/skills/python-pipeline

Skill Files

Browse the full folder contents for python-pipeline.

Download Skill

Loading file tree…

dev-toolkit/skills/python-pipeline/SKILL.md

Skill Metadata

Name
python-pipeline
Description
Python data processing pipelines with modular architecture. Use when building content processing workflows, implementing dispatcher patterns, integrating Google Sheets/Drive APIs, or creating batch processing systems. Covers patterns from rosen-scraper, image-analyzer, and social-scraper projects.

Python data pipeline development

Patterns for building production-quality data processing pipelines with Python.

Targeted at Python 3.11+ for asyncio.TaskGroup and exception groups; Python 3.12+ for the lighter type X = ... syntax. Pin a 3.13+ runtime if you want the JIT or experimental free-threading; the patterns here don't depend on either.

Choosing a DataFrame engine: pandas vs polars vs DuckDB

For a long time pandas was the default for any tabular work in Python. As of 2026 the default has shifted: polars is the right pick for multi-GB pipelines on a single machine, DuckDB is the right pick when SQL or larger-than-RAM scans are involved, and pandas stays useful for small data and the ML/notebook ecosystem (scikit-learn, statsmodels, plotnine all speak it natively).

| Tool | When | Why | |---|---|---| | pandas | < ~1 GB data, ML interop, single-threaded familiarity | Mature, ubiquitous, eager DataFrame model. Slowest in benchmarks but most ecosystem support. | | polars | 1 GB - tens of GB on one box, performance-critical pipelines | Multithreaded by default, lazy query engine, Arrow-native. ~5x speedup over pandas on filter / aggregate at 100M rows. | | DuckDB | SQL workflows, larger-than-RAM, parquet/CSV scanning, joins across many files | Vectorized + pipelined execution, cost-based optimizer, streaming scans. Works great as a thin wrapper over a directory of parquet files. |

All three speak Apache Arrow, so zero-copy interop between them is the pragmatic answer most of the time:

import polars as pl
import duckdb

# Polars: read a directory of CSVs, filter, group
df = (
    pl.scan_csv('data/articles_*.csv')
      .filter(pl.col('published_at') >= '2026-01-01')
      .group_by('source')
      .agg(pl.len().alias('count'), pl.col('word_count').mean())
      .collect()
)

# DuckDB: same shape with SQL, no intermediate copy
con = duckdb.connect()
df = con.execute("""
    SELECT source, COUNT(*) AS count, AVG(word_count) AS avg_wc
    FROM 'data/articles_*.csv'
    WHERE published_at >= '2026-01-01'
    GROUP BY source
""").pl()  # returns a Polars DataFrame; use .df() for pandas

# Hand off to pandas only at the boundary that needs it (e.g. scikit-learn)
import pandas as pd
pdf = df.to_pandas()

If your pipeline already uses pandas everywhere, don't pre-emptively rewrite. Migrate the bottleneck stages first — typically the CSV-load + filter step.

Architecture patterns

Modular processor architecture

src/
├── workflow.py              # Main orchestrator
├── dispatcher.py            # Content-type router
├── processors/
│   ├── __init__.py
│   ├── base.py             # Abstract base class
│   ├── article_processor.py
│   ├── video_processor.py
│   └── audio_processor.py
├── services/
│   ├── sheets_service.py   # Google Sheets integration
│   ├── drive_service.py    # Google Drive integration
│   └── ai_service.py       # Gemini API wrapper
├── utils/
│   ├── logger.py
│   └── rate_limiter.py
└── config.py               # Environment configuration

Dispatcher pattern

from typing import Protocol
from urllib.parse import urlparse

class Processor(Protocol):
    def can_process(self, url: str) -> bool: ...
    def process(self, url: str, metadata: dict) -> dict: ...

class Dispatcher:
    def __init__(self):
        self.processors: list[Processor] = [
            ArticleProcessor(),
            VideoProcessor(),
            AudioProcessor(),
            SocialProcessor(),
        ]

    def dispatch(self, url: str, metadata: dict) -> dict:
        for processor in self.processors:
            if processor.can_process(url):
                return processor.process(url, metadata)
        raise ValueError(f"No processor found for URL: {url}")

# Pattern-based routing
class ArticleProcessor:
    DOMAINS = ['nytimes.com', 'washingtonpost.com', 'medium.com']

    def can_process(self, url: str) -> bool:
        domain = urlparse(url).netloc.replace('www.', '')
        return any(d in domain for d in self.DOMAINS)

CSV-based pipeline workflow

import csv
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Iterator

@dataclass
class Record:
    id: str
    url: str
    title: str | None = None
    content: str | None = None
    status: str = 'pending'

def read_input(path: Path) -> Iterator[Record]:
    with open(path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield Record(**{k: v for k, v in row.items() if k in Record.__annotations__})

def write_output(records: list[Record], path: Path):
    with open(path, 'w', encoding='utf-8', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=list(Record.__annotations__.keys()))
        writer.writeheader()
        writer.writerows(asdict(r) for r in records)

def process_batch(input_path: Path, output_path: Path):
    dispatcher = Dispatcher()
    results = []

    for record in read_input(input_path):
        try:
            processed = dispatcher.dispatch(record.url, asdict(record))
            record.status = 'completed'
            record.title = processed.get('title')
            record.content = processed.get('content')
        except Exception as e:
            record.status = f'failed: {e}'
        results.append(record)

    write_output(results, output_path)

Google Sheets integration

import gspread
from google.oauth2.service_account import Credentials

SCOPES = [
    'https://www.googleapis.com/auth/spreadsheets',
    'https://www.googleapis.com/auth/drive'
]

class SheetsService:
    def __init__(self, credentials_path: str):
        creds = Credentials.from_service_account_file(credentials_path, scopes=SCOPES)
        self.client = gspread.authorize(creds)

    def get_worksheet(self, spreadsheet_id: str, sheet_name: str):
        spreadsheet = self.client.open_by_key(spreadsheet_id)
        return spreadsheet.worksheet(sheet_name)

    def read_all(self, worksheet) -> list[dict]:
        return worksheet.get_all_records()

    def append_row(self, worksheet, row: list):
        worksheet.append_row(row, value_input_option='USER_ENTERED')

    def batch_update(self, worksheet, updates: list[dict]):
        """Update multiple cells efficiently."""
        # Format: [{'range': 'A1', 'values': [[value]]}]
        worksheet.batch_update(updates, value_input_option='USER_ENTERED')

    def find_row_by_id(self, worksheet, id_value: str, id_column: int = 1) -> int | None:
        """Find row number by ID value."""
        try:
            cell = worksheet.find(id_value, in_column=id_column)
            return cell.row
        except gspread.CellNotFound:
            return None

Rate limiting

import time
from functools import wraps
from ratelimit import limits, sleep_and_retry

# Simple rate limiter
@sleep_and_retry
@limits(calls=10, period=60)  # 10 calls per minute
def rate_limited_api_call(url: str):
    return requests.get(url)

# Custom rate limiter with backoff
class RateLimiter:
    def __init__(self, calls_per_minute: int = 10):
        self.delay = 60 / calls_per_minute
        self.last_call = 0

    def wait(self):
        elapsed = time.time() - self.last_call
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)
        self.last_call = time.time()

# Usage
limiter = RateLimiter(calls_per_minute=10)

def fetch_with_rate_limit(url: str):
    limiter.wait()
    return requests.get(url)

Concurrent fetching with asyncio.TaskGroup (3.11+)

For I/O-bound stages (HTTP fetches, API calls), asyncio.TaskGroup plus httpx.AsyncClient runs many requests in parallel without the boilerplate of asyncio.gather. TaskGroup's structured-concurrency model means an exception in one task cancels the rest and surfaces as an ExceptionGroup — easier to reason about than gather(return_exceptions=True).

import asyncio
import httpx

async def fetch_one(client: httpx.AsyncClient, url: str) -> tuple[str, str | Exception]:
    try:
        response = await client.get(url, timeout=30)
        response.raise_for_status()
        return (url, response.text)
    except Exception as e:
        return (url, e)

async def fetch_many(urls: list[str], concurrency: int = 10) -> dict[str, str | Exception]:
    results: dict[str, str | Exception] = {}
    sem = asyncio.Semaphore(concurrency)

    async def _bounded(client: httpx.AsyncClient, url: str):
        async with sem:
            url, body = await fetch_one(client, url)
            results[url] = body

    async with httpx.AsyncClient(http2=True, timeout=30) as client:
        async with asyncio.TaskGroup() as tg:
            for url in urls:
                tg.create_task(_bounded(client, url))

    return results

# Usage
urls = ['https://example.com/a', 'https://example.com/b', ...]
data = asyncio.run(fetch_many(urls, concurrency=20))

Pair with aiolimiter if you need a true requests-per-second cap (semaphore alone bounds concurrency, not rate). For exponential-backoff retries, wrap fetch_one with tenacity.AsyncRetrying.

Progress tracking with resume capability

import json
from pathlib import Path

class ProgressTracker:
    def __init__(self, progress_file: Path):
        self.progress_file = progress_file
        self.state = self._load()

    def _load(self) -> dict:
        if self.progress_file.exists():
            return json.loads(self.progress_file.read_text())
        return {'processed_ids': [], 'last_row': 0, 'errors': []}

    def save(self):
        self.progress_file.write_text(json.dumps(self.state, indent=2))

    def mark_processed(self, record_id: str):
        self.state['processed_ids'].append(record_id)
        self.save()

    def is_processed(self, record_id: str) -> bool:
        return record_id in self.state['processed_ids']

    def log_error(self, record_id: str, error: str):
        self.state['errors'].append({'id': record_id, 'error': error})
        self.save()

# Usage in workflow
tracker = ProgressTracker(Path('progress.json'))

for record in records:
    if tracker.is_processed(record.id):
        continue  # Skip already processed

    try:
        process(record)
        tracker.mark_processed(record.id)
    except Exception as e:
        tracker.log_error(record.id, str(e))

Gemini AI integration

The google-generativeai package was deprecated August 31, 2025 and the unified google-genai SDK replaced it. New code should target google-genai:

pip install google-genai
import os
import json
from google import genai
from google.genai import types

# Client carries config (API key, project, location). Reuse across calls.
client = genai.Client(api_key=os.environ['GEMINI_API_KEY'])

# Pick a current model. Names drift; check ai.google.dev/gemini-api/docs/models
# for the active list. gemini-2.5-flash is a reasonable cost-efficient default.
DEFAULT_MODEL = 'gemini-2.5-flash'

class AIService:
    def __init__(self, model: str = DEFAULT_MODEL):
        self.model = model

    def categorize(self, text: str, taxonomy: dict) -> dict:
        prompt = f"""Analyze this content and categorize it.

Content:
{text[:10000]}

Taxonomy:
{json.dumps(taxonomy, indent=2)}

Respond with JSON containing:
- category: one of the taxonomy categories
- tags: list of relevant tags
- summary: 2-3 sentence summary
"""
        response = client.models.generate_content(
            model=self.model,
            contents=prompt,
            config=types.GenerateContentConfig(response_mime_type='application/json'),
        )
        return json.loads(response.text)

    def extract_entities(self, text: str) -> list[dict]:
        prompt = f"""Extract named entities from this text.

Text:
{text[:10000]}

For each entity, provide:
- name: entity name
- type: Person, Organization, Location, Event, Work, or Concept
- prominence: 1-10 score based on importance in text

Respond with JSON array of entities.
"""
        response = client.models.generate_content(
            model=self.model,
            contents=prompt,
            config=types.GenerateContentConfig(response_mime_type='application/json'),
        )
        return json.loads(response.text)

# Batch processing with token-usage tracking (cost varies by model and time;
# look up live pricing rather than hardcoding a per-1k figure).
class BatchAIProcessor:
    def __init__(self, ai_service: AIService):
        self.ai = ai_service
        self.input_tokens = 0
        self.output_tokens = 0

    def process_batch(
        self, items: list[str], prompt_template: str
    ) -> list[dict]:
        """Render each item into prompt_template via .format(item=...).
        prompt_template must instruct the model to return JSON, since this
        method enforces response_mime_type='application/json'.
        """
        results = []
        for item in items:
            response = client.models.generate_content(
                model=self.ai.model,
                contents=prompt_template.format(item=item),
                config=types.GenerateContentConfig(
                    response_mime_type='application/json'
                ),
            )
            usage = response.usage_metadata
            self.input_tokens += usage.prompt_token_count or 0
            self.output_tokens += usage.candidates_token_count or 0
            results.append(json.loads(response.text))
        return results

response.usage_metadata carries the actual token counts, which is more accurate than length heuristics. Without response_mime_type='application/json', Gemini returns prose (often wrapped in markdown fences) and json.loads fails — every JSON-returning call needs both the config flag and a JSON-shaped prompt. For multimodal calls, pass content as a list (text + parts), not a single string.

Image classification with Gemini Vision

from google import genai
from google.genai import types
from PIL import Image
from pathlib import Path

client = genai.Client(api_key=os.environ['GEMINI_API_KEY'])

def classify_image(image_path: Path, categories: list[str]) -> dict:
    image = Image.open(image_path)

    prompt = f"""Analyze this image and classify it.

Available categories: {', '.join(categories)}

Respond with JSON:
{{
  "category": "category name",
  "description": "brief description",
  "suggested_filename": "descriptive-filename-with-dashes",
  "tags": ["tag1", "tag2", "tag3"]
}}
"""
    response = client.models.generate_content(
        model='gemini-2.5-flash',
        contents=[prompt, image],
        config=types.GenerateContentConfig(response_mime_type='application/json'),
    )
    return json.loads(response.text)

# pathlib.Path.glob does NOT support brace expansion (`*.{jpg,png,webp}`);
# iterate the extensions explicitly.
IMAGE_EXTS = ('.jpg', '.jpeg', '.png', '.webp')

def organize_images(source_dir: Path, output_dir: Path):
    categories = ['Nature', 'People', 'Architecture', 'Art', 'Technology', 'Other']

    image_paths = (
        p for p in source_dir.iterdir()
        if p.is_file() and p.suffix.lower() in IMAGE_EXTS
    )

    for image_path in image_paths:
        try:
            result = classify_image(image_path, categories)
            category_dir = output_dir / result['category']
            category_dir.mkdir(parents=True, exist_ok=True)

            new_name = f"{result['suggested_filename']}{image_path.suffix.lower()}"
            image_path.rename(category_dir / new_name)
        except Exception as e:
            failures = output_dir / 'failures'
            failures.mkdir(parents=True, exist_ok=True)
            image_path.rename(failures / image_path.name)

Environment configuration

from pathlib import Path
from dotenv import load_dotenv
import os

load_dotenv()

class Config:
    # API Keys
    GEMINI_API_KEY = os.environ['GEMINI_API_KEY']
    GOOGLE_SHEET_ID = os.environ['GOOGLE_SHEET_ID']

    # Paths
    PROJECT_ROOT = Path(__file__).parent.parent
    DATA_DIR = PROJECT_ROOT / 'data'
    OUTPUT_DIR = PROJECT_ROOT / 'output'
    CREDENTIALS_PATH = PROJECT_ROOT / 'google_credentials.json'

    # Rate limits
    API_CALLS_PER_MINUTE = 10
    BATCH_SIZE = 50

    @classmethod
    def ensure_dirs(cls):
        cls.DATA_DIR.mkdir(exist_ok=True)
        cls.OUTPUT_DIR.mkdir(exist_ok=True)

Logging setup

import logging
from pathlib import Path
from datetime import datetime

def setup_logging(log_dir: Path, name: str = 'pipeline') -> logging.Logger:
    log_dir.mkdir(exist_ok=True)

    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)

    # Console handler (INFO+)
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    console.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))

    # File handler (DEBUG+)
    log_file = log_dir / f"{name}_{datetime.now():%Y%m%d_%H%M%S}.log"
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    ))

    logger.addHandler(console)
    logger.addHandler(file_handler)

    return logger

Common pitfalls

Google Sheets cell limits:

MAX_CELL_LENGTH = 50000

def truncate_for_sheets(text: str) -> str:
    if len(text) > MAX_CELL_LENGTH:
        return text[:MAX_CELL_LENGTH - 20] + '... [truncated]'
    return text

CSV encoding issues:

# Always specify encoding
with open(path, 'r', encoding='utf-8-sig') as f:  # BOM handling
    reader = csv.reader(f)

API quota management:

# Cache API responses
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_api_call(url: str) -> dict:
    return api_client.fetch(url)