Agent Skills: ChatKit Python Backend Skill

Build custom chat API backends for OpenAI ChatKit frontend using FastAPI. Covers chat endpoint implementation, SSE streaming, conversation persistence, and integration with OpenAI Agents SDK + Gemini via LiteLLM.

UncategorizedID: naimalarain13/hackathon-ii_the-evolution-of-todo/chatkit-python

Install this agent skill to your local

pnpm dlx add-skill https://github.com/NaimalArain13/Hackathon-II_The-Evolution-of-Todo/tree/HEAD/.claude/skills/chatkit-python

Skill Files

Browse the full folder contents for chatkit-python.

Download Skill

Loading file tree…

.claude/skills/chatkit-python/SKILL.md

Skill Metadata

Name
chatkit-python
Description
Build custom chat API backends for OpenAI ChatKit frontend using FastAPI. Covers chat endpoint implementation, SSE streaming, conversation persistence, and integration with OpenAI Agents SDK + Gemini via LiteLLM.

ChatKit Python Backend Skill

Build custom chat API backends that work with OpenAI ChatKit frontend.

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                     Frontend (ChatKit-JS)                               │
│                  useChatKit({ api: { url, domainKey } })               │
└─────────────────────────────────────────────────────────────────────────┘
                                  │
                                  │ HTTP POST /api/chat
                                  │ (SSE streaming response)
                                  ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                     FastAPI Backend                                     │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                   /api/chat Endpoint                             │   │
│  │  • Parse ChatKit request                                         │   │
│  │  • Run Agent with MCP tools                                      │   │
│  │  • Stream response via SSE                                       │   │
│  │  • Persist conversation to DB                                    │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                              │                                          │
│                              ▼                                          │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐    │
│  │  OpenAI Agents  │  │   MCP Server    │  │   Neon Database     │    │
│  │  SDK + Gemini   │  │   (Todo Tools)  │  │   (Conversations)   │    │
│  └─────────────────┘  └─────────────────┘  └─────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────┘

Quick Start

Installation

pip install fastapi uvicorn sse-starlette "openai-agents[litellm]"

# Or with uv
uv add fastapi uvicorn sse-starlette "openai-agents[litellm]"

Environment Variables

# Database
DATABASE_URL=postgresql://user:password@host/database

# Gemini API
GOOGLE_API_KEY=your-gemini-api-key

# CORS (frontend URL)
CORS_ORIGINS=http://localhost:3000,https://your-app.vercel.app

Reference

| Pattern | Guide | |---------|-------| | Chat Endpoint | reference/chat-endpoint.md | | SSE Streaming | reference/sse-streaming.md | | Conversation Persistence | reference/conversation-persistence.md |

Examples

| Example | Description | |---------|-------------| | examples/chat-server.md | Complete chat server implementation |

Templates

| Template | Purpose | |----------|---------| | templates/chat_router.py | FastAPI chat endpoint router | | templates/models.py | Database models for conversations |

Basic Chat Endpoint

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from typing import Optional, List, AsyncGenerator
import json

app = FastAPI()

# CORS for ChatKit frontend
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class ChatMessage(BaseModel):
    role: str  # "user" or "assistant"
    content: str

class ChatRequest(BaseModel):
    messages: List[ChatMessage]
    thread_id: Optional[str] = None
    user_id: Optional[str] = None

async def generate_response(messages: List[ChatMessage]) -> AsyncGenerator[str, None]:
    """Generate streaming response from agent."""
    # Your agent logic here
    response = "Hello! How can I help you?"

    # Stream response word by word
    for word in response.split():
        yield f"data: {json.dumps({'content': word + ' '})}\n\n"

    yield f"data: {json.dumps({'done': True})}\n\n"

@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
    """Chat endpoint with SSE streaming."""
    return EventSourceResponse(
        generate_response(request.messages),
        media_type="text/event-stream",
    )

Chat Endpoint with Agent

import os
from fastapi import FastAPI, Request, HTTPException
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
from typing import Optional, List, AsyncGenerator
import json

from agents import Agent, Runner, set_tracing_disabled
from agents.mcp import MCPServerStreamableHttp
from agents.extensions.models.litellm_model import LitellmModel

set_tracing_disabled(disabled=True)

app = FastAPI()

class ChatMessage(BaseModel):
    role: str
    content: str

class ChatRequest(BaseModel):
    messages: List[ChatMessage]
    thread_id: Optional[str] = None
    user_id: str

# Global MCP server connection (managed via lifespan)
mcp_server = None

@app.on_event("startup")
async def startup():
    global mcp_server
    mcp_server = MCPServerStreamableHttp(
        name="Todo MCP",
        params={"url": "http://localhost:8000/api/mcp", "timeout": 30},
        cache_tools_list=True,
    )
    await mcp_server.__aenter__()

@app.on_event("shutdown")
async def shutdown():
    global mcp_server
    if mcp_server:
        await mcp_server.__aexit__(None, None, None)

def create_agent(user_id: str):
    """Create agent with user context."""
    return Agent(
        name="Todo Assistant",
        instructions=f"""You are a task management assistant for user {user_id}.
Use the MCP tools to help manage tasks:
- add_task: Create new tasks
- list_tasks: View tasks
- complete_task: Mark done
- delete_task: Remove tasks
Always pass user_id="{user_id}" to tools.""",
        model=LitellmModel(
            model="gemini/gemini-2.0-flash",
            api_key=os.getenv("GOOGLE_API_KEY"),
        ),
        mcp_servers=[mcp_server] if mcp_server else [],
    )

async def stream_agent_response(
    agent: Agent,
    user_message: str,
) -> AsyncGenerator[str, None]:
    """Stream agent response as SSE events."""
    try:
        result = Runner.run_streamed(agent, user_message)

        async for event in result.stream_events():
            if hasattr(event, 'item') and event.item:
                yield f"data: {json.dumps({'content': str(event.item)})}\n\n"

        # Final response
        final = result.final_output
        yield f"data: {json.dumps({'content': final, 'done': True})}\n\n"

    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

@app.post("/api/chat")
async def chat(request: ChatRequest):
    """Chat endpoint with streaming response."""
    if not request.messages:
        raise HTTPException(400, "No messages provided")

    # Get last user message
    user_message = request.messages[-1].content

    # Create agent for this user
    agent = create_agent(request.user_id)

    return EventSourceResponse(
        stream_agent_response(agent, user_message),
        media_type="text/event-stream",
    )

SSE Event Format

ChatKit expects Server-Sent Events in this format:

# Content chunk
yield f"data: {json.dumps({'content': 'partial response'})}\n\n"

# Done signal
yield f"data: {json.dumps({'done': True})}\n\n"

# Error
yield f"data: {json.dumps({'error': 'error message'})}\n\n"

# Tool call (optional)
yield f"data: {json.dumps({'tool_call': {'name': 'add_task', 'result': {...}}})}\n\n"

Conversation Persistence

from sqlmodel import SQLModel, Field, Session
from datetime import datetime
from typing import Optional

class Conversation(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    thread_id: str = Field(index=True, unique=True)
    user_id: str = Field(index=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

class Message(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    thread_id: str = Field(index=True)
    role: str  # "user" or "assistant"
    content: str
    created_at: datetime = Field(default_factory=datetime.utcnow)

async def save_message(thread_id: str, role: str, content: str):
    """Save message to database."""
    with Session(engine) as session:
        message = Message(thread_id=thread_id, role=role, content=content)
        session.add(message)
        session.commit()

async def load_conversation(thread_id: str) -> List[dict]:
    """Load conversation history."""
    with Session(engine) as session:
        messages = session.exec(
            select(Message)
            .where(Message.thread_id == thread_id)
            .order_by(Message.created_at)
        ).all()
        return [{"role": m.role, "content": m.content} for m in messages]

Authentication Integration

from fastapi import Depends, HTTPException, Header

async def get_current_user(authorization: str = Header(...)) -> dict:
    """Verify JWT and extract user."""
    if not authorization.startswith("Bearer "):
        raise HTTPException(401, "Invalid authorization header")

    token = authorization[7:]
    user = await verify_jwt_token(token)  # Your JWT verification

    if not user:
        raise HTTPException(401, "Invalid token")

    return user

@app.post("/api/chat")
async def chat(
    request: ChatRequest,
    user: dict = Depends(get_current_user),
):
    """Authenticated chat endpoint."""
    # Use user["id"] as user_id
    agent = create_agent(user["id"])
    # ...

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

# Get from environment
CORS_ORIGINS = os.getenv("CORS_ORIGINS", "http://localhost:3000").split(",")

app.add_middleware(
    CORSMiddleware,
    allow_origins=CORS_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Error Handling

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"error": str(exc)},
    )

@app.post("/api/chat")
async def chat(request: ChatRequest):
    try:
        # ... chat logic
        pass
    except ConnectionError:
        raise HTTPException(503, "MCP server unavailable")
    except TimeoutError:
        raise HTTPException(504, "Request timed out")
    except Exception as e:
        raise HTTPException(500, f"Internal error: {e}")

Testing

import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_chat_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.post(
            "/api/chat",
            json={
                "messages": [{"role": "user", "content": "Hello"}],
                "user_id": "test-user",
            },
        )
        assert response.status_code == 200
        assert "text/event-stream" in response.headers["content-type"]

Best Practices

  1. Stream responses - Better UX with SSE streaming
  2. Persist conversations - Enable conversation continuity
  3. Authenticate requests - Verify user before processing
  4. Handle errors gracefully - Return structured error responses
  5. Configure CORS - Allow frontend domain
  6. Use connection pooling - For database and MCP connections
  7. Log conversations - For debugging and analytics