Agent Skills: Anthropic Events & Async Processing

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/anth-webhooks-events

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/anthropic-pack/skills/anth-webhooks-events

Skill Files

Browse the full folder contents for anth-webhooks-events.

Download Skill

Loading file tree…

plugins/saas-packs/anthropic-pack/skills/anth-webhooks-events/SKILL.md

Skill Metadata

Name
anth-webhooks-events
Description
'Implement event-driven patterns with Claude API: streaming SSE events,

Anthropic Events & Async Processing

Overview

The Claude API does not use traditional webhooks. Instead it provides two event-driven patterns: Server-Sent Events (SSE) for real-time streaming and the Message Batches API for async bulk processing. This skill covers both.

SSE Streaming Events

import anthropic

client = anthropic.Anthropic()

# Process each SSE event type
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain microservices."}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"Started: {event.message.id}")
            case "content_block_start":
                if event.content_block.type == "tool_use":
                    print(f"Tool call: {event.content_block.name}")
            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    print(event.delta.partial_json, end="")
            case "message_delta":
                print(f"\nStop: {event.delta.stop_reason}")
                print(f"Output tokens: {event.usage.output_tokens}")
            case "message_stop":
                print("[Complete]")

SSE Event Reference

| Event | When | Key Data | |-------|------|----------| | message_start | Stream begins | message.id, message.model, message.usage.input_tokens | | content_block_start | New block begins | content_block.type (text or tool_use), index | | content_block_delta | Incremental content | delta.text or delta.partial_json | | content_block_stop | Block finishes | index | | message_delta | Message-level update | delta.stop_reason, usage.output_tokens | | message_stop | Stream complete | (empty) | | ping | Keepalive | (empty) |

Async Batch Processing

# Submit batch (up to 100K requests, 50% cheaper)
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": f"Summarize: {doc}"}]
            }
        }
        for i, doc in enumerate(documents)
    ]
)

# Poll for completion
import time
while True:
    status = client.messages.batches.retrieve(batch.id)
    if status.processing_status == "ended":
        break
    counts = status.request_counts
    print(f"Processing: {counts.processing} | Done: {counts.succeeded} | Errors: {counts.errored}")
    time.sleep(30)

# Stream results
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        print(f"[{result.custom_id}]: {result.result.message.content[0].text[:100]}")
    else:
        print(f"[{result.custom_id}] ERROR: {result.result.error}")

Event-Driven Architecture Pattern

# Use queues to decouple Claude requests from user-facing endpoints
from redis import Redis
from rq import Queue

redis = Redis()
queue = Queue(connection=redis)

def process_with_claude(prompt: str, callback_url: str):
    """Background job for async Claude processing."""
    client = anthropic.Anthropic()
    msg = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    # Notify your system via internal callback
    import requests
    requests.post(callback_url, json={
        "text": msg.content[0].text,
        "usage": {"input": msg.usage.input_tokens, "output": msg.usage.output_tokens}
    })

# Enqueue from your API handler
job = queue.enqueue(process_with_claude, prompt="...", callback_url="https://internal/callback")

Error Handling

| Issue | Cause | Fix | |-------|-------|-----| | Stream disconnects | Network timeout | Reconnect and re-request (responses are not resumable) | | Batch expired | Not processed in 24h | Resubmit the batch | | errored results | Individual request was invalid | Check result.error.message per request |

Resources

Next Steps

For performance optimization, see anth-performance-tuning.