Quick Reference
| Platform | Cold Start | Packages | Best For | |----------|------------|----------|----------| | Workers (Pyodide) | ~50ms | Limited | API endpoints | | Containers | ~10s | Any | Heavy compute, AI |
| Worker Pattern | Code |
|----------------|------|
| Basic handler | class Default(WorkerEntrypoint): |
| FastAPI | await asgi.fetch(app, request, self.env) |
| Env vars | self.env.API_KEY |
| Service binding | await self.env.WORKER_B.method() |
| Command | Purpose |
|---------|---------|
| pywrangler init | Create Python Worker |
| pywrangler dev | Local development |
| pywrangler deploy | Deploy to Cloudflare |
| Container vs Worker | Recommendation | |---------------------|----------------| | Simple API | Worker | | pandas/numpy | Container | | GPU/AI | Container | | <50ms latency | Worker |
When to Use This Skill
Use for Cloudflare edge deployment:
- Deploying Python APIs to Cloudflare Workers
- Running FastAPI on Cloudflare edge
- Using Containers for heavy compute
- Setting up service-to-service RPC
- Optimizing cold starts
Related skills:
- For FastAPI: see
python-fastapi - For async patterns: see
python-asyncio - For Docker: see
python-github-actions
Python on Cloudflare (Workers & Containers)
Overview
Cloudflare provides two ways to run Python:
- Python Workers - Serverless functions using Pyodide (WebAssembly)
- Cloudflare Containers - Full Docker containers (beta, June 2025)
Python Workers
How It Works
- Python runs via Pyodide (CPython compiled to WebAssembly)
- Executes inside V8 isolates on Cloudflare's edge network
- Memory snapshots enable fast cold starts
- Supports many pure Python packages
Quick Start
# Install pywrangler (Python Workers CLI)
pip install pywrangler
# Create new Python Worker
pywrangler init my-worker
cd my-worker
# Project structure
my-worker/
├── src/
│ └── entry.py
├── pyproject.toml
└── wrangler.toml
Basic Worker
# src/entry.py
from workers import Response, WorkerEntrypoint
class Default(WorkerEntrypoint):
async def fetch(self, request):
return Response("Hello from Python Worker!")
Configuration
# wrangler.toml
name = "my-python-worker"
main = "src/entry.py"
compatibility_date = "2024-12-01"
[build]
command = ""
# pyproject.toml
[project]
name = "my-python-worker"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"fastapi",
"pydantic",
]
[dependency-groups]
dev = ["workers-py"]
FastAPI on Workers
# src/entry.py
from workers import WorkerEntrypoint
from fastapi import FastAPI, Request
from pydantic import BaseModel
app = FastAPI()
class Default(WorkerEntrypoint):
async def fetch(self, request):
import asgi
return await asgi.fetch(app, request, self.env)
@app.get("/")
async def root():
return {"message": "Hello from FastAPI on Cloudflare!"}
@app.get("/env")
async def get_env(req: Request):
env = req.scope.get("env")
if env and hasattr(env, "API_KEY"):
return {"has_api_key": True}
return {"has_api_key": False}
class Item(BaseModel):
name: str
price: float
description: str | None = None
@app.post("/items/")
async def create_item(item: Item):
return item
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
Using Environment Variables
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Access environment variables via self.env
api_key = self.env.API_KEY
database_url = self.env.DATABASE_URL
return Response(f"API Key exists: {bool(api_key)}")
# wrangler.toml
[vars]
API_KEY = "your-api-key"
DATABASE_URL = "your-database-url"
Service Bindings (RPC)
# Worker A - Caller
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Call Worker B's add method
result = await self.env.WORKER_B.add(1, 2)
return Response(f"Result: {result}")
# Worker B - Callee
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
return Response("Hello from Worker B")
def add(self, a: int, b: int) -> int:
return a + b
# Worker A wrangler.toml
[[services]]
binding = "WORKER_B"
service = "worker-b"
Caching
from workers import WorkerEntrypoint
from pyodide.ffi import to_js as _to_js
from js import Response, URL, Object, fetch
def to_js(x):
return _to_js(x, dict_converter=Object.fromEntries)
class Default(WorkerEntrypoint):
async def fetch(self, request):
request_url = URL.new(request.url)
params = request_url.searchParams
tags = params["tags"].split(",") if "tags" in params else []
url = params["uri"] or None
if url is None:
return Response.json(to_js({"error": "URL required"}), status=400)
# Fetch with cache tags
options = {"cf": {"cacheTags": tags}}
result = await fetch(url, to_js(options))
cache_status = result.headers["cf-cache-status"]
return Response.json(to_js({
"cache": cache_status,
"status": result.status
}))
Supported Packages
# HTTP clients (async only)
import aiohttp
import httpx
# Data processing
import numpy # Limited support
import pandas # Limited support
# Standard library works well
import json
import re
import urllib
import base64
import hashlib
# Check Pyodide package list for full support
# https://pyodide.org/en/stable/usage/packages-in-pyodide.html
Cold Start Optimization
# Imports at module level are cached in memory snapshot
import json
import hashlib
from pydantic import BaseModel
# This code runs during snapshot creation
CACHED_CONFIG = load_config()
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Fast access to pre-loaded modules and data
return Response(json.dumps(CACHED_CONFIG))
Cloudflare Containers (Beta)
Overview (June 2025)
- Full Docker container support
- Run any language/runtime (Python, Go, Java, etc.)
- FFmpeg, Pandas, AI toolchains supported
- Pay-per-use pricing
- Global edge deployment
When to Use Containers vs Workers
| Feature | Workers | Containers | |---------|---------|------------| | Cold start | ~50ms | ~10s (with prewarming) | | Package support | Pyodide-compatible | Any | | Memory | Limited | Configurable | | File system | No | Yes | | Native binaries | No | Yes | | Best for | API endpoints | Batch jobs, AI, heavy compute |
Container Setup
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run application
CMD ["python", "main.py"]
# main.py
import pandas as pd
import numpy as np
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/process")
async def process_data():
# Heavy computation that wouldn't work in Workers
df = pd.DataFrame(np.random.randn(10000, 4))
result = df.describe().to_dict()
return result
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
Wrangler Commands
# Build container
wrangler containers build . --tag my-app:latest
# Push to registry
wrangler containers push my-app:latest
# Deploy
wrangler containers deploy my-app
# List containers
wrangler containers list
# Delete
wrangler containers delete my-app
Container Optimization
# Multi-stage build for smaller images
FROM python:3.12-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt --target=/app/deps
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /app/deps /app/deps
COPY . .
ENV PYTHONPATH=/app/deps
CMD ["python", "main.py"]
# Optimize for container cold starts
import asyncio
# Lazy imports for faster startup
def get_pandas():
import pandas as pd
return pd
async def process_large_data():
pd = get_pandas() # Import only when needed
# Process data...
GPU Support (Preview)
# Container with GPU support
import torch
def run_inference(data):
device = "cuda" if torch.cuda.is_available() else "cpu"
model = load_model().to(device)
with torch.no_grad():
result = model(data.to(device))
return result.cpu().numpy()
Python Workflows (Durable Execution)
Overview
Cloudflare Workflows now supports Python for multi-step, long-running applications with automatic retries and state persistence.
from cloudflare.workflows import Workflow, step
class DataPipeline(Workflow):
@step
async def fetch_data(self, url: str) -> dict:
"""Fetch data from external API."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
@step
async def process_data(self, data: dict) -> dict:
"""Process the fetched data."""
# Heavy processing - will retry if fails
processed = transform(data)
return processed
@step
async def save_results(self, data: dict) -> str:
"""Save to database."""
result_id = await save_to_db(data)
return result_id
async def run(self, input_url: str) -> str:
data = await self.fetch_data(input_url)
processed = await self.process_data(data)
result_id = await self.save_results(processed)
return f"Saved as {result_id}"
Best Practices
1. Minimize Cold Starts
# Do expensive imports at module level
import json
import hashlib
from pydantic import BaseModel
# Pre-compute static data
STATIC_CONFIG = {"version": "1.0", "features": ["a", "b"]}
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Fast path - no initialization needed
return Response(json.dumps(STATIC_CONFIG))
2. Use Async HTTP Clients
# Workers only support async HTTP
import httpx
class Default(WorkerEntrypoint):
async def fetch(self, request):
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return Response(response.text)
3. Handle Errors Gracefully
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
try:
result = await self.process_request(request)
return Response(json.dumps(result), headers={
"Content-Type": "application/json"
})
except ValueError as e:
return Response(json.dumps({"error": str(e)}), status=400)
except Exception as e:
# Log error (sent to Workers Logs)
print(f"Error: {e}")
return Response(json.dumps({"error": "Internal error"}), status=500)
4. Structure for Testability
# src/handlers.py - Pure Python logic
async def handle_create_item(data: dict) -> dict:
# Testable without Worker runtime
validated = validate_item(data)
return {"id": generate_id(), **validated}
# src/entry.py - Worker entrypoint
from workers import WorkerEntrypoint, Response
from handlers import handle_create_item
class Default(WorkerEntrypoint):
async def fetch(self, request):
if request.method == "POST":
data = await request.json()
result = await handle_create_item(data)
return Response(json.dumps(result))
5. Local Development
# Start local development server
pywrangler dev
# Test locally
curl http://localhost:8787/
# Deploy to Cloudflare
pywrangler deploy