Agent Skills: Langfuse Core Workflow A: Tracing LLM Calls

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/langfuse-core-workflow-a

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/langfuse-pack/skills/langfuse-core-workflow-a

Skill Files

Browse the full folder contents for langfuse-core-workflow-a.

Download Skill

Loading file tree…

plugins/saas-packs/langfuse-pack/skills/langfuse-core-workflow-a/SKILL.md

Skill Metadata

Name
langfuse-core-workflow-a
Description
|

Langfuse Core Workflow A: Tracing LLM Calls

Overview

End-to-end tracing of LLM calls, chains, and agents. Covers the OpenAI drop-in wrapper, manual tracing with startActiveObservation, RAG pipeline instrumentation, streaming response tracking, and LangChain integration.

Prerequisites

  • Completed langfuse-install-auth setup
  • OpenAI SDK installed (npm install openai)
  • For v4+: @langfuse/openai, @langfuse/tracing, @langfuse/otel, @opentelemetry/sdk-node

Instructions

Step 1: OpenAI Drop-In Wrapper (Zero-Code Tracing)

import OpenAI from "openai";
import { observeOpenAI } from "@langfuse/openai";

// Wrap the OpenAI client -- all calls are now traced automatically
const openai = observeOpenAI(new OpenAI());

// Every call captures: model, input, output, tokens, latency, cost
const response = await openai.chat.completions.create({
  model: "gpt-4o",
  messages: [
    { role: "system", content: "You are a helpful assistant." },
    { role: "user", content: "What is Langfuse?" },
  ],
});

// Add metadata to traces
const res = await observeOpenAI(new OpenAI(), {
  generationName: "product-description",
  generationMetadata: { feature: "onboarding" },
  sessionId: "session-abc",
  userId: "user-123",
  tags: ["production", "onboarding"],
}).chat.completions.create({
  model: "gpt-4o-mini",
  messages: [{ role: "user", content: "Describe this product" }],
});

Step 2: Manual Tracing -- RAG Pipeline (v4+ SDK)

import { startActiveObservation, updateActiveObservation } from "@langfuse/tracing";

async function ragPipeline(query: string) {
  return await startActiveObservation("rag-pipeline", async () => {
    updateActiveObservation({ input: { query }, metadata: { pipeline: "rag-v2" } });

    // Span: Query embedding
    const embedding = await startActiveObservation("embed-query", async () => {
      updateActiveObservation({ input: { text: query } });
      const vector = await embedText(query);
      updateActiveObservation({
        output: { dimensions: vector.length },
        metadata: { model: "text-embedding-3-small" },
      });
      return vector;
    });

    // Span: Vector search
    const documents = await startActiveObservation("vector-search", async () => {
      updateActiveObservation({ input: { dimensions: embedding.length } });
      const docs = await searchVectorDB(embedding);
      updateActiveObservation({
        output: { documentCount: docs.length, topScore: docs[0]?.score },
      });
      return docs;
    });

    // Generation: LLM call with context
    const answer = await startActiveObservation(
      { name: "generate-answer", asType: "generation" },
      async () => {
        updateActiveObservation({
          model: "gpt-4o",
          input: { query, context: documents.map((d) => d.content) },
        });

        const result = await generateAnswer(query, documents);

        updateActiveObservation({
          output: result.content,
          usage: {
            promptTokens: result.usage.prompt_tokens,
            completionTokens: result.usage.completion_tokens,
          },
        });
        return result.content;
      }
    );

    updateActiveObservation({ output: { answer } });
    return answer;
  });
}

Step 3: Manual Tracing -- RAG Pipeline (v3 Legacy)

import { Langfuse } from "langfuse";

const langfuse = new Langfuse();

async function ragPipeline(query: string) {
  const trace = langfuse.trace({
    name: "rag-pipeline",
    input: { query },
    metadata: { pipeline: "rag-v1" },
  });

  const embedSpan = trace.span({ name: "embed-query", input: { text: query } });
  const embedding = await embedText(query);
  embedSpan.end({ output: { dimensions: embedding.length } });

  const searchSpan = trace.span({ name: "vector-search" });
  const documents = await searchVectorDB(embedding);
  searchSpan.end({ output: { count: documents.length, topScore: documents[0]?.score } });

  const generation = trace.generation({
    name: "generate-answer",
    model: "gpt-4o",
    modelParameters: { temperature: 0.7, maxTokens: 500 },
    input: { query, context: documents.map((d) => d.content) },
  });

  const answer = await generateAnswer(query, documents);

  generation.end({
    output: answer.content,
    usage: {
      promptTokens: answer.usage.prompt_tokens,
      completionTokens: answer.usage.completion_tokens,
      totalTokens: answer.usage.total_tokens,
    },
  });

  trace.update({ output: { answer: answer.content } });
  await langfuse.flushAsync();
  return answer.content;
}

Step 4: Streaming Response Tracking

import OpenAI from "openai";
import { observeOpenAI } from "@langfuse/openai";

// The wrapper handles streaming automatically
const openai = observeOpenAI(new OpenAI());

const stream = await openai.chat.completions.create({
  model: "gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }],
  stream: true,
  stream_options: { include_usage: true }, // Required for token tracking
});

let fullContent = "";
for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || "";
  fullContent += content;
  process.stdout.write(content);
}
// Token usage and latency are captured automatically by the wrapper

Step 5: Anthropic Claude Tracing (Manual)

import Anthropic from "@anthropic-ai/sdk";
import { startActiveObservation, updateActiveObservation } from "@langfuse/tracing";

const anthropic = new Anthropic();

async function callClaude(prompt: string) {
  return await startActiveObservation(
    { name: "claude-call", asType: "generation" },
    async () => {
      updateActiveObservation({
        model: "claude-sonnet-4-20250514",
        input: [{ role: "user", content: prompt }],
      });

      const response = await anthropic.messages.create({
        model: "claude-sonnet-4-20250514",
        max_tokens: 1024,
        messages: [{ role: "user", content: prompt }],
      });

      updateActiveObservation({
        output: response.content[0].text,
        usage: {
          promptTokens: response.usage.input_tokens,
          completionTokens: response.usage.output_tokens,
        },
      });

      return response.content[0].text;
    }
  );
}

Step 6: LangChain Integration (Python)

from langfuse.callback import CallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

langfuse_handler = CallbackHandler()

llm = ChatOpenAI(model="gpt-4o")

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{input}"),
])

chain = prompt | llm

# All LangChain operations are automatically traced
result = chain.invoke(
    {"input": "What is Langfuse?"},
    config={"callbacks": [langfuse_handler]},
)

Error Handling

| Issue | Cause | Solution | |-------|-------|----------| | Missing generations | OpenAI wrapper not applied | Use observeOpenAI() from @langfuse/openai | | Orphaned spans | Missing end or callback finish | Use startActiveObservation (auto-ends) or .end() in finally | | No token usage on stream | Stream usage not requested | Add stream_options: { include_usage: true } | | Flat trace (no nesting) | Missing OTel context | Ensure NodeSDK is started with LangfuseSpanProcessor |

Resources

Next Steps

For evaluation and scoring workflows, see langfuse-core-workflow-b.