Agent Skills: OpenRouter Streaming Setup

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/openrouter-streaming-setup

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/openrouter-pack/skills/openrouter-streaming-setup

Skill Files

Browse the full folder contents for openrouter-streaming-setup.

Download Skill

Loading file tree…

plugins/saas-packs/openrouter-pack/skills/openrouter-streaming-setup/SKILL.md

Skill Metadata

Name
openrouter-streaming-setup
Description
|

OpenRouter Streaming Setup

Overview

OpenRouter supports Server-Sent Events (SSE) streaming via stream: true, compatible with the OpenAI SDK. Streaming returns tokens as they're generated, reducing time-to-first-token (TTFT) from seconds to milliseconds. Usage stats are available via stream_options: {include_usage: true} in the final chunk. This skill covers Python and TypeScript streaming, SSE forwarding to browsers, and error recovery.

Python: Basic Streaming

import os
from openai import OpenAI

client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=os.environ["OPENROUTER_API_KEY"],
    default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
)

# Stream with usage stats
stream = client.chat.completions.create(
    model="anthropic/claude-3.5-sonnet",
    messages=[{"role": "user", "content": "Explain how HTTP streaming works"}],
    max_tokens=500,
    stream=True,
    stream_options={"include_usage": True},  # Get token counts in final chunk
)

full_content = []
for chunk in stream:
    if chunk.choices and chunk.choices[0].delta.content:
        token = chunk.choices[0].delta.content
        print(token, end="", flush=True)
        full_content.append(token)

    # Final chunk contains usage stats
    if chunk.usage:
        print(f"\n---\nTokens: {chunk.usage.prompt_tokens} in + {chunk.usage.completion_tokens} out")

result = "".join(full_content)

Python: Streaming with Metrics

import time

def stream_with_metrics(messages, model="anthropic/claude-3.5-sonnet", **kwargs):
    """Stream response and capture performance metrics."""
    start = time.monotonic()
    first_token_time = None
    chunks = []
    usage = None

    stream = client.chat.completions.create(
        model=model, messages=messages, stream=True,
        stream_options={"include_usage": True},
        **kwargs,
    )

    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            token = chunk.choices[0].delta.content
            if first_token_time is None:
                first_token_time = (time.monotonic() - start) * 1000
            chunks.append(token)
            yield token  # Yield each token as it arrives

        if chunk.usage:
            usage = {
                "prompt_tokens": chunk.usage.prompt_tokens,
                "completion_tokens": chunk.usage.completion_tokens,
            }

    total_time = (time.monotonic() - start) * 1000
    # Metrics available after generator exhausted
    stream_with_metrics.last_metrics = {
        "ttft_ms": round(first_token_time or 0),
        "total_ms": round(total_time),
        "usage": usage,
        "model": model,
    }

# Usage
for token in stream_with_metrics(
    [{"role": "user", "content": "Hello"}],
    model="openai/gpt-4o-mini",
    max_tokens=200,
):
    print(token, end="", flush=True)
print(f"\nMetrics: {stream_with_metrics.last_metrics}")

TypeScript: Streaming

import OpenAI from "openai";

const client = new OpenAI({
  baseURL: "https://openrouter.ai/api/v1",
  apiKey: process.env.OPENROUTER_API_KEY,
  defaultHeaders: { "HTTP-Referer": "https://my-app.com", "X-Title": "my-app" },
});

async function streamCompletion(prompt: string, model = "openai/gpt-4o-mini") {
  const stream = await client.chat.completions.create({
    model,
    messages: [{ role: "user", content: prompt }],
    max_tokens: 500,
    stream: true,
  });

  const chunks: string[] = [];
  for await (const chunk of stream) {
    const token = chunk.choices[0]?.delta?.content;
    if (token) {
      process.stdout.write(token);
      chunks.push(token);
    }
  }
  return chunks.join("");
}

SSE Forwarding to Browser (FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/v1/stream")
async def stream_endpoint(prompt: str, model: str = "openai/gpt-4o-mini"):
    """Forward OpenRouter SSE stream to browser."""
    async def generate():
        stream = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=1024,
            stream=True,
        )
        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                yield f"data: {json.dumps({'token': token})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Browser Client (JavaScript)

// Consume SSE stream from your backend
async function streamChat(prompt) {
  const response = await fetch("/v1/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ prompt }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    for (const line of text.split("\n")) {
      if (line.startsWith("data: ") && line !== "data: [DONE]") {
        const data = JSON.parse(line.slice(6));
        document.getElementById("output").textContent += data.token;
      }
    }
  }
}

Async Streaming (Python)

from openai import AsyncOpenAI

aclient = AsyncOpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=os.environ["OPENROUTER_API_KEY"],
    default_headers={"HTTP-Referer": "https://my-app.com", "X-Title": "my-app"},
)

async def async_stream(messages, model="openai/gpt-4o-mini", **kwargs):
    """Async streaming for use in async web frameworks."""
    stream = await aclient.chat.completions.create(
        model=model, messages=messages, stream=True, **kwargs,
    )
    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Error Handling

| Error | Cause | Fix | |-------|-------|-----| | Stream cuts off mid-response | Network timeout or provider error | Save partial content; implement retry from last position | | Missing usage in stream | Didn't set stream_options | Add stream_options: {"include_usage": True} | | Empty delta chunks | Keep-alive pings | Filter chunk.choices[0].delta.content is None | | finish_reason: "length" | Hit max_tokens limit | Increase max_tokens or continue with follow-up request |

Enterprise Considerations

  • Always use stream_options: {"include_usage": True} to get token counts for cost tracking
  • Set connection timeouts appropriate for streaming (longer than non-streaming, e.g., 120s)
  • Implement heartbeat detection: if no chunks for >30s, consider the stream dead and retry
  • Buffer partial tokens on the server before forwarding to the client for smoother rendering
  • Log TTFT per model to benchmark streaming performance over time
  • Use streaming for all user-facing requests; use non-streaming for batch/background processing

References