Agent Skills: OpenRouter Load Balancing

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/openrouter-load-balancing

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/openrouter-pack/skills/openrouter-load-balancing

Skill Files

Browse the full folder contents for openrouter-load-balancing.

Download Skill

Loading file tree…

plugins/saas-packs/openrouter-pack/skills/openrouter-load-balancing/SKILL.md

Skill Metadata

Name
openrouter-load-balancing
Description
|

OpenRouter Load Balancing

Overview

A single OpenRouter API key has rate limits (requests/minute and tokens/minute). To scale beyond those limits, distribute requests across multiple keys. OpenRouter also provides server-side load balancing via provider routing and the :nitro variant for low-latency inference. This skill covers multi-key rotation, health-based routing, circuit breakers, and concurrent request patterns.

Multi-Key Round Robin

import os, itertools, time, logging
from openai import OpenAI, RateLimitError
from dataclasses import dataclass, field

log = logging.getLogger("openrouter.lb")

@dataclass
class KeyPool:
    """Round-robin API key pool with health tracking."""
    keys: list[str]
    _cycle: itertools.cycle = field(init=False, repr=False)
    _health: dict[str, dict] = field(init=False, default_factory=dict)

    def __post_init__(self):
        self._cycle = itertools.cycle(self.keys)
        self._health = {k: {"errors": 0, "last_error": 0, "healthy": True} for k in self.keys}

    def next_key(self) -> str:
        """Get next healthy key."""
        attempts = 0
        while attempts < len(self.keys):
            key = next(self._cycle)
            h = self._health[key]
            # Recover after 60s cooldown
            if not h["healthy"] and time.time() - h["last_error"] > 60:
                h["healthy"] = True
                h["errors"] = 0
            if h["healthy"]:
                return key
            attempts += 1
        # All keys unhealthy -- return any and hope for the best
        return next(self._cycle)

    def mark_error(self, key: str):
        h = self._health[key]
        h["errors"] += 1
        h["last_error"] = time.time()
        if h["errors"] >= 3:  # Circuit breaker: 3 errors → unhealthy
            h["healthy"] = False
            log.warning(f"Key {key[:12]}... marked unhealthy after {h['errors']} errors")

    def mark_success(self, key: str):
        self._health[key]["errors"] = 0
        self._health[key]["healthy"] = True

pool = KeyPool(keys=[
    os.environ.get("OPENROUTER_KEY_1", ""),
    os.environ.get("OPENROUTER_KEY_2", ""),
    os.environ.get("OPENROUTER_KEY_3", ""),
])

def balanced_completion(messages, model="anthropic/claude-3.5-sonnet", **kwargs):
    """Send request using next healthy key from the pool."""
    key = pool.next_key()
    client = OpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=key,
        default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
    )
    try:
        response = client.chat.completions.create(
            model=model, messages=messages, **kwargs
        )
        pool.mark_success(key)
        return response
    except RateLimitError:
        pool.mark_error(key)
        # Retry with next key
        return balanced_completion(messages, model, **kwargs)

Concurrent Request Processing

import asyncio
from openai import AsyncOpenAI

async def parallel_completions(prompts: list[str], model="openai/gpt-4o-mini",
                                max_concurrent=5, **kwargs):
    """Process multiple prompts concurrently with rate limiting."""
    semaphore = asyncio.Semaphore(max_concurrent)
    client = AsyncOpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=os.environ["OPENROUTER_API_KEY"],
        default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
    )

    async def process_one(prompt: str):
        async with semaphore:
            response = await client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                **kwargs,
            )
            return response.choices[0].message.content

    return await asyncio.gather(*[process_one(p) for p in prompts])

# Usage
results = asyncio.run(parallel_completions(
    ["Summarize X", "Translate Y", "Analyze Z"],
    max_concurrent=3,
    max_tokens=500,
))

Provider-Level Load Balancing

# OpenRouter can distribute across providers for the same model
response = client.chat.completions.create(
    model="anthropic/claude-3.5-sonnet",
    messages=[{"role": "user", "content": "Hello"}],
    max_tokens=200,
    extra_body={
        "provider": {
            # Let OpenRouter pick the best available provider
            "order": ["Anthropic", "AWS Bedrock", "GCP Vertex"],
            "allow_fallbacks": True,
        },
    },
)

Rate Limit Awareness

import requests

def check_rate_limits(api_key: str) -> dict:
    """Check current rate limit status for a key."""
    resp = requests.get(
        "https://openrouter.ai/api/v1/auth/key",
        headers={"Authorization": f"Bearer {api_key}"},
    )
    data = resp.json()["data"]
    return {
        "requests_limit": data["rate_limit"]["requests"],
        "interval": data["rate_limit"]["interval"],
        "credits_used": data["usage"],
        "credits_limit": data.get("limit"),
    }

# Check all keys in pool
for key in pool.keys:
    limits = check_rate_limits(key)
    print(f"Key {key[:12]}...: {limits}")

Error Handling

| Error | Cause | Fix | |-------|-------|-----| | 429 on all keys | All keys rate-limited simultaneously | Add more keys; implement request queuing | | Uneven load distribution | Round-robin not accounting for in-flight requests | Use weighted distribution based on current load | | Key health false positive | Transient error marked key unhealthy | Use sliding window (3 errors in 60s) before marking unhealthy | | Concurrent request failures | Too many parallel requests | Reduce semaphore limit; add backoff |

Enterprise Considerations

  • Create separate API keys per service/team with individual credit limits for cost isolation
  • Use 3+ keys to multiply effective rate limits (each key gets its own quota)
  • Implement circuit breakers: mark keys unhealthy after N consecutive errors, recover after cooldown
  • Use asyncio.Semaphore to control concurrency and prevent overwhelming the API
  • Monitor per-key error rates and latency to detect degraded keys early
  • Combine multi-key rotation with provider routing for maximum resilience

References