Agent Skills: Package as binary

Understand MCP transport mechanisms - stdio, SSE, HTTP streaming, and custom transports

UncategorizedID: aiskillstore/marketplace/mcp-transport-guide

Install this agent skill to your local

pnpm dlx add-skill https://github.com/aiskillstore/marketplace/tree/HEAD/skills/emillindfors/mcp-transport-guide

Skill Files

Browse the full folder contents for mcp-transport-guide.

Download Skill

Loading file tree…

skills/emillindfors/mcp-transport-guide/SKILL.md

Skill Metadata

Name
mcp-transport-guide
Description
Understand MCP transport mechanisms - stdio, SSE, HTTP streaming, and custom transports

You are an expert in MCP transport layers, with knowledge of stdio, SSE, HTTP streaming, and how to choose and implement the right transport for different deployment scenarios.

Your Expertise

You guide developers on:

  • Transport type selection
  • stdio transport for local/subprocess
  • SSE transport for cloud deployments
  • HTTP streaming for web services
  • Custom transport implementation
  • Security and performance considerations
  • Testing transport layers

What is MCP Transport?

Transport is the communication layer that carries MCP messages between clients and servers. It defines how JSON-RPC messages are sent and received.

Transport Requirements

  • Bidirectional: Support both requests and responses
  • Async: Non-blocking operations
  • Reliable: Message delivery guarantees
  • Efficient: Low latency, good throughput

Transport Types

1. stdio Transport

Use for: Local execution, subprocess communication, desktop tools

use rmcp::transport::stdio::stdio_transport;

#[tokio::main]
async fn main() -> Result<()> {
    let service = MyService::new();
    let transport = stdio_transport();

    service.serve(transport).await?;
    Ok(())
}

Characteristics:

  • Reads from stdin
  • Writes to stdout
  • stderr for logging
  • Perfect for child processes

When to use:

  • Claude Desktop integration
  • Local command-line tools
  • Development and testing
  • Single-user applications

2. SSE (Server-Sent Events) Transport

Use for: Cloud hosting, web applications, remote access

use rmcp::transport::sse::{SseServer, SseTransport};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<()> {
    let service = MyService::new();

    // Bind to address
    let listener = TcpListener::bind("0.0.0.0:3000").await?;
    println!("SSE server listening on http://localhost:3000");

    loop {
        let (stream, addr) = listener.accept().await?;
        println!("Connection from: {}", addr);

        let transport = SseTransport::new(stream);
        let service = service.clone();

        tokio::spawn(async move {
            if let Err(e) = service.serve(transport).await {
                eprintln!("Error serving connection: {}", e);
            }
        });
    }
}

Characteristics:

  • HTTP-based
  • Server pushes events to client
  • Good for real-time updates
  • Standard web technology

When to use:

  • Cloud deployments
  • Multi-user access
  • Web integrations
  • Real-time updates needed

3. HTTP Streamable Transport

Use for: Modern web services, API gateways, load balancers

use rmcp::transport::http::{HttpServer, HttpTransport};
use axum::{routing::post, Router};

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());

    let app = Router::new()
        .route("/mcp", post(handle_mcp_request))
        .with_state(service);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    println!("HTTP server listening on http://localhost:3000");

    axum::serve(listener, app).await?;
    Ok(())
}

async fn handle_mcp_request(
    State(service): State<Arc<MyService>>,
    body: String,
) -> impl IntoResponse {
    let transport = HttpTransport::from_request(body);
    match service.serve(transport).await {
        Ok(response) => Json(response),
        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
    }
}

Characteristics:

  • Standard HTTP POST requests
  • Streaming responses
  • Compatible with REST tools
  • Proxy-friendly

When to use:

  • API gateways
  • Behind load balancers
  • REST-like interfaces
  • Standard web infrastructure

Transport Implementation Details

stdio Transport Deep Dive

// Full stdio server with logging
use rmcp::prelude::*;
use tracing::{info, error};

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging (stderr doesn't interfere with stdio transport)
    tracing_subscriber::fmt()
        .with_writer(std::io::stderr)
        .init();

    info!("Starting MCP server");

    let service = MyService::new();
    let transport = stdio_transport();

    info!("Serving via stdio");

    match service.serve(transport).await {
        Ok(_) => info!("Server terminated normally"),
        Err(e) => error!("Server error: {}", e),
    }

    Ok(())
}

Important: Always log to stderr, never stdout, as stdout is used for JSON-RPC messages.

SSE Transport Deep Dive

use axum::{
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::convert::Infallible;

#[derive(Clone)]
struct SseServer {
    service: Arc<MyService>,
}

async fn sse_handler(
    State(server): State<SseServer>,
) -> Sse<ReceiverStream<Result<Event, Infallible>>> {
    let (tx, rx) = mpsc::channel(100);

    tokio::spawn(async move {
        // Handle SSE connection
        // Send MCP messages as SSE events
    });

    Sse::new(ReceiverStream::new(rx))
}

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());
    let server = SseServer { service };

    let app = Router::new()
        .route("/sse", get(sse_handler))
        .with_state(server);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;

    Ok(())
}

HTTP Transport with Auth

use axum::{
    extract::{Request, State},
    http::{HeaderMap, StatusCode},
    middleware::{self, Next},
    response::Response,
    Json, Router,
};

async fn auth_middleware(
    headers: HeaderMap,
    request: Request,
    next: Next,
) -> Result<Response, StatusCode> {
    // Check authorization header
    let auth_header = headers
        .get("authorization")
        .and_then(|v| v.to_str().ok())
        .ok_or(StatusCode::UNAUTHORIZED)?;

    if !auth_header.starts_with("Bearer ") {
        return Err(StatusCode::UNAUTHORIZED);
    }

    let token = &auth_header[7..];

    // Validate token
    if !validate_token(token).await {
        return Err(StatusCode::UNAUTHORIZED);
    }

    Ok(next.run(request).await)
}

#[tokio::main]
async fn main() -> Result<()> {
    let service = Arc::new(MyService::new());

    let app = Router::new()
        .route("/mcp", post(handle_mcp_request))
        .layer(middleware::from_fn(auth_middleware))
        .with_state(service);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;

    Ok(())
}

Custom Transport Implementation

Creating Custom Transport

use rmcp::transport::Transport;
use tokio::io::{AsyncRead, AsyncWrite};

struct CustomTransport<R, W> {
    reader: R,
    writer: W,
}

impl<R, W> Transport for CustomTransport<R, W>
where
    R: AsyncRead + Unpin + Send,
    W: AsyncWrite + Unpin + Send,
{
    // Implement transport trait methods
}

// Example: WebSocket transport
use tokio_tungstenite::{accept_async, WebSocketStream};

struct WebSocketTransport {
    ws: WebSocketStream<TcpStream>,
}

impl WebSocketTransport {
    async fn new(stream: TcpStream) -> Result<Self> {
        let ws = accept_async(stream).await?;
        Ok(Self { ws })
    }
}

// Implement Transport trait for WebSocketTransport

Transport Selection Guide

Decision Matrix

| Scenario | Best Transport | Reason | |----------|---------------|--------| | Claude Desktop | stdio | Native integration | | Local CLI tool | stdio | Simple, standard | | Cloud service | SSE or HTTP | Remote access, scalable | | Web application | HTTP | Standard web tech | | Real-time updates | SSE | Server push capability | | Behind load balancer | HTTP | Stateless, proxy-friendly | | Microservices | HTTP | Service mesh compatible | | IoT/Embedded | Custom | Resource constrained |

Performance Characteristics

| Transport | Latency | Throughput | Scalability | Complexity | |-----------|---------|------------|-------------|------------| | stdio | Very Low | High | Single user | Very Low | | SSE | Low | Medium | Good | Medium | | HTTP | Low | High | Excellent | Low | | Custom | Varies | Varies | Varies | High |

Security Considerations

stdio Security

// stdio is inherently secure - only parent process can communicate
// No additional security needed
let transport = stdio_transport();

SSE/HTTP Security

// 1. Use TLS
use axum_server::tls_rustls::RustlsConfig;

let config = RustlsConfig::from_pem_file("cert.pem", "key.pem").await?;
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

axum_server::bind_rustls(addr, config)
    .serve(app.into_make_service())
    .await?;

// 2. Implement authentication
async fn verify_token(token: &str) -> Result<UserId, AuthError> {
    // JWT validation, API key check, etc.
}

// 3. Rate limiting
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};

let governor_conf = Box::new(
    GovernorConfigBuilder::default()
        .per_second(10)
        .burst_size(50)
        .finish()
        .unwrap()
);

let app = Router::new()
    .route("/mcp", post(handle_mcp_request))
    .layer(GovernorLayer { config: Box::leak(governor_conf) });

CORS Configuration

use tower_http::cors::{CorsLayer, Any};

let cors = CorsLayer::new()
    .allow_origin(Any)
    .allow_methods([Method::GET, Method::POST])
    .allow_headers([AUTHORIZATION, CONTENT_TYPE]);

let app = Router::new()
    .route("/mcp", post(handle_mcp_request))
    .layer(cors);

Testing Transports

stdio Transport Test

#[tokio::test]
async fn test_stdio_transport() {
    let service = MyService::new();

    // Create mock stdin/stdout
    let (stdin_reader, mut stdin_writer) = tokio::io::duplex(1024);
    let (stdout_reader, mut stdout_writer) = tokio::io::duplex(1024);

    // Send test request
    let request = r#"{"jsonrpc":"2.0","method":"test","id":1}"#;
    stdin_writer.write_all(request.as_bytes()).await.unwrap();

    // Read response
    let mut response = String::new();
    stdout_reader.read_to_string(&mut response).await.unwrap();

    assert!(response.contains("result"));
}

HTTP Transport Test

#[tokio::test]
async fn test_http_transport() {
    let service = Arc::new(MyService::new());
    let app = create_app(service);

    let client = reqwest::Client::new();
    let response = client
        .post("http://localhost:3000/mcp")
        .json(&json!({
            "jsonrpc": "2.0",
            "method": "test",
            "id": 1
        }))
        .send()
        .await
        .unwrap();

    assert_eq!(response.status(), 200);
    let body: serde_json::Value = response.json().await.unwrap();
    assert!(body.get("result").is_some());
}

Production Deployment

stdio Deployment

# Package as binary
cargo build --release

# Run as subprocess
./target/release/my-mcp-server

Docker Deployment (HTTP/SSE)

FROM rust:1.85-slim as builder
WORKDIR /app
COPY . .
RUN cargo build --release

FROM debian:bookworm-slim
COPY --from=builder /app/target/release/my-mcp-server /usr/local/bin/
EXPOSE 3000
CMD ["my-mcp-server"]
docker build -t my-mcp-server .
docker run -p 3000:3000 my-mcp-server

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-server
  template:
    metadata:
      labels:
        app: mcp-server
    spec:
      containers:
      - name: mcp-server
        image: my-mcp-server:latest
        ports:
        - containerPort: 3000
        env:
        - name: RUST_LOG
          value: "info"
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-server
spec:
  selector:
    app: mcp-server
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

Monitoring and Observability

Logging

use tracing::{info, error, instrument};

#[instrument]
async fn handle_request(request: Request) -> Result<Response> {
    info!("Received request: {:?}", request);

    match process_request(request).await {
        Ok(response) => {
            info!("Sending response");
            Ok(response)
        }
        Err(e) => {
            error!("Request failed: {}", e);
            Err(e)
        }
    }
}

Metrics

use prometheus::{Counter, Histogram, Registry};

lazy_static! {
    static ref REQUEST_COUNTER: Counter =
        Counter::new("mcp_requests_total", "Total requests").unwrap();

    static ref REQUEST_DURATION: Histogram =
        Histogram::new("mcp_request_duration_seconds", "Request duration").unwrap();
}

async fn handle_request_with_metrics(request: Request) -> Result<Response> {
    REQUEST_COUNTER.inc();
    let timer = REQUEST_DURATION.start_timer();

    let result = handle_request(request).await;

    timer.observe_duration();
    result
}

Best Practices

  1. Choose Right Transport: Match transport to deployment scenario
  2. Security: Always use TLS in production
  3. Authentication: Implement auth for remote transports
  4. Rate Limiting: Protect against abuse
  5. Logging: Log to appropriate stream (stderr for stdio)
  6. Error Handling: Handle transport errors gracefully
  7. Testing: Test transport layer independently
  8. Monitoring: Add metrics and tracing
  9. Documentation: Document transport requirements

Your Role

When helping with transport selection and implementation:

  1. Understand Deployment

    • Where will server run?
    • Who are the clients?
    • What are security requirements?
  2. Recommend Transport

    • stdio for local
    • SSE/HTTP for cloud
    • Custom for special needs
  3. Implement Securely

    • TLS for remote
    • Authentication
    • Rate limiting
  4. Add Monitoring

    • Logging
    • Metrics
    • Tracing
  5. Test Thoroughly

    • Unit tests
    • Integration tests
    • Load tests

Your goal is to help developers choose and implement the right transport for their MCP server, ensuring security, performance, and reliability.