Agent Skills: Mistral AI Events, Agents & Async Patterns

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/mistral-webhooks-events

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/mistral-pack/skills/mistral-webhooks-events

Skill Files

Browse the full folder contents for mistral-webhooks-events.

Download Skill

Loading file tree…

plugins/saas-packs/mistral-pack/skills/mistral-webhooks-events/SKILL.md

Skill Metadata

Name
mistral-webhooks-events
Description
|

Mistral AI Events, Agents & Async Patterns

Overview

Async and event-driven patterns for Mistral AI: the Agents API for stateful multi-turn workflows, Batch API for cost-effective bulk inference (50% cheaper), SSE streaming endpoints, background job queues, and Python async processing. Mistral does not have native webhooks — this skill covers the patterns that replace them.

Prerequisites

  • @mistralai/mistralai SDK installed
  • MISTRAL_API_KEY configured
  • For agents: La Plateforme access to create agents
  • For batch: JSONL file preparation

Instructions

Step 1: Mistral Agents API

Create stateful agents with instructions, tools, and model configuration:

import { Mistral } from '@mistralai/mistralai';

const client = new Mistral({ apiKey: process.env.MISTRAL_API_KEY });

// Create an agent on La Plateforme
const agent = await client.agents.create({
  name: 'Code Reviewer',
  model: 'mistral-large-latest',
  instructions: `You are an expert code reviewer. Analyze code for:
    - Security vulnerabilities
    - Performance issues
    - Best practice violations
    Provide actionable feedback with severity ratings.`,
  description: 'Reviews code for security, performance, and best practices',
  tools: [
    {
      type: 'function',
      function: {
        name: 'search_codebase',
        description: 'Search the codebase for patterns',
        parameters: {
          type: 'object',
          properties: { query: { type: 'string' } },
          required: ['query'],
        },
      },
    },
  ],
});

// Chat with the agent (stateful conversation)
const response = await client.agents.complete({
  agentId: agent.id,
  messages: [
    { role: 'user', content: 'Review this function:\n```\nfunction auth(pwd) { return pwd === "admin123"; }\n```' },
  ],
});

console.log(response.choices?.[0]?.message?.content);

Step 2: Batch API for Bulk Inference

50% cost reduction for non-time-sensitive workloads:

// 1. Prepare JSONL input file
const batchRequests = [
  {
    custom_id: 'req-1',
    body: {
      model: 'mistral-small-latest',
      messages: [{ role: 'user', content: 'Summarize: ...' }],
      max_tokens: 200,
    },
  },
  {
    custom_id: 'req-2',
    body: {
      model: 'mistral-small-latest',
      messages: [{ role: 'user', content: 'Classify: ...' }],
      max_tokens: 50,
    },
  },
];

// Write to JSONL
import { writeFileSync } from 'fs';
writeFileSync('batch-input.jsonl',
  batchRequests.map(r => JSON.stringify(r)).join('\n')
);

// 2. Upload file and create batch job
const file = await client.files.upload({
  file: { fileName: 'batch-input.jsonl', content: readFileSync('batch-input.jsonl') },
  purpose: 'batch',
});

const batch = await client.batch.jobs.create({
  inputFiles: [file.id],
  endpoint: '/v1/chat/completions',
  model: 'mistral-small-latest',
});

console.log(`Batch job: ${batch.id}, status: ${batch.status}`);

// 3. Poll for completion
async function waitForBatch(jobId: string): Promise<any> {
  while (true) {
    const status = await client.batch.jobs.get({ jobId });
    console.log(`Status: ${status.status}`);

    if (status.status === 'SUCCESS') return status;
    if (status.status === 'FAILED') throw new Error(`Batch failed: ${status.errors}`);

    await new Promise(r => setTimeout(r, 30_000)); // Check every 30s
  }
}

Step 3: Event-Driven Streaming Architecture

import { EventEmitter } from 'events';

interface MistralEvents {
  'chat:start': { requestId: string; model: string };
  'chat:chunk': { requestId: string; content: string; index: number };
  'chat:complete': { requestId: string; content: string; usage: any };
  'chat:error': { requestId: string; error: Error };
}

class MistralEventBus extends EventEmitter {
  private client: Mistral;

  constructor() {
    super();
    this.client = new Mistral({ apiKey: process.env.MISTRAL_API_KEY! });
  }

  async streamChat(requestId: string, messages: any[], model = 'mistral-small-latest') {
    this.emit('chat:start', { requestId, model });

    try {
      const stream = await this.client.chat.stream({ model, messages });
      let full = '';
      let index = 0;

      for await (const event of stream) {
        const content = event.data?.choices?.[0]?.delta?.content;
        if (content) {
          full += content;
          this.emit('chat:chunk', { requestId, content, index: index++ });
        }
      }

      this.emit('chat:complete', { requestId, content: full, usage: { estimatedTokens: Math.ceil(full.length / 4) } });
      return full;
    } catch (error) {
      this.emit('chat:error', { requestId, error: error as Error });
      throw error;
    }
  }
}

// Wire up listeners
const bus = new MistralEventBus();
bus.on('chat:start', ({ requestId, model }) => console.log(`[${requestId}] Starting ${model}`));
bus.on('chat:chunk', ({ content }) => process.stdout.write(content));
bus.on('chat:complete', ({ requestId, usage }) => console.log(`\n[${requestId}] Done`));
bus.on('chat:error', ({ requestId, error }) => console.error(`[${requestId}] Error: ${error.message}`));

Step 4: Background Job Queue with BullMQ

import { Queue, Worker } from 'bullmq';
import { Mistral } from '@mistralai/mistralai';

const connection = { host: 'localhost', port: 6379 };
const chatQueue = new Queue('mistral-chat', { connection });

// Worker processes jobs
const worker = new Worker('mistral-chat', async (job) => {
  const client = new Mistral({ apiKey: process.env.MISTRAL_API_KEY! });

  const response = await client.chat.complete({
    model: job.data.model ?? 'mistral-small-latest',
    messages: job.data.messages,
  });

  const result = {
    content: response.choices?.[0]?.message?.content,
    usage: response.usage,
  };

  // Optional: call webhook on completion
  if (job.data.callbackUrl) {
    await fetch(job.data.callbackUrl, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ jobId: job.id, ...result }),
    });
  }

  return result;
}, {
  connection,
  concurrency: 5,
  limiter: { max: 10, duration: 1000 }, // 10 jobs/sec max
});

// Enqueue from API
async function enqueueChat(messages: any[], callbackUrl?: string) {
  const job = await chatQueue.add('chat', {
    messages,
    model: 'mistral-small-latest',
    callbackUrl,
  }, {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
  });

  return { jobId: job.id, status: 'queued' };
}

Step 5: Python Async Batch Processing

import asyncio
import os
from mistralai import Mistral

async def process_batch(prompts: list[str], concurrency: int = 5):
    """Process prompts concurrently with rate limiting."""
    client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
    semaphore = asyncio.Semaphore(concurrency)
    results = []

    async def process_one(prompt: str, idx: int):
        async with semaphore:
            response = await client.chat.complete_async(
                model="mistral-small-latest",
                messages=[{"role": "user", "content": prompt}],
            )
            return {"index": idx, "content": response.choices[0].message.content}

    tasks = [process_one(p, i) for i, p in enumerate(prompts)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# Usage
results = asyncio.run(process_batch([
    "Summarize quantum computing",
    "Explain neural networks",
    "What is reinforcement learning",
]))

Error Handling

| Issue | Cause | Solution | |-------|-------|----------| | Batch job stuck | Processing queue full | Check status, resubmit if FAILED | | Agent context lost | Session expired | Store conversation in your DB | | Worker crash | Unhandled exception | BullMQ auto-retries with backoff | | SSE disconnected | Client/network timeout | Implement reconnection logic |

Resources

Output

  • Agents API integration for stateful workflows
  • Batch API for 50%-cheaper bulk processing
  • Event-driven streaming architecture
  • Background job queue with retry/callback
  • Python async concurrent processing