Agent Skills: Perplexity Events & Async Patterns

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/perplexity-webhooks-events

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/perplexity-pack/skills/perplexity-webhooks-events

Skill Files

Browse the full folder contents for perplexity-webhooks-events.

Download Skill

Loading file tree…

plugins/saas-packs/perplexity-pack/skills/perplexity-webhooks-events/SKILL.md

Skill Metadata

Name
perplexity-webhooks-events
Description
|

Perplexity Events & Async Patterns

Overview

Build event-driven architectures around Perplexity Sonar API. Perplexity does not have webhooks -- all interactions are request/response. Event patterns are built using streaming SSE, job queues for batch processing, and cron-triggered monitoring.

Event Patterns

| Pattern | Trigger | Use Case | |---------|---------|----------| | Streaming SSE | Client request | Real-time search with progressive rendering | | Batch queue | Job submission | Research automation, report generation | | Scheduled search | Cron job | News monitoring, trend alerts, competitive intel | | Citation pipeline | Post-processing | Source verification, link validation |

Prerequisites

  • openai package installed
  • PERPLEXITY_API_KEY set
  • Queue system (BullMQ, SQS) for batch patterns
  • Cron scheduler for monitoring patterns

Instructions

Step 1: Streaming Search (Server-Sent Events)

import OpenAI from "openai";
import express from "express";

const perplexity = new OpenAI({
  apiKey: process.env.PERPLEXITY_API_KEY!,
  baseURL: "https://api.perplexity.ai",
});

const app = express();
app.use(express.json());

app.post("/api/search/stream", async (req, res) => {
  const { query, model = "sonar" } = req.body;

  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  try {
    const stream = await perplexity.chat.completions.create({
      model,
      messages: [{ role: "user", content: query }],
      stream: true,
      max_tokens: 2048,
    });

    let fullText = "";
    for await (const chunk of stream) {
      const text = chunk.choices[0]?.delta?.content || "";
      fullText += text;

      res.write(`data: ${JSON.stringify({ type: "text", content: text })}\n\n`);

      // Citations arrive in the final chunk
      const citations = (chunk as any).citations;
      if (citations) {
        res.write(`data: ${JSON.stringify({ type: "citations", urls: citations })}\n\n`);
      }
    }

    res.write(`data: ${JSON.stringify({ type: "done", totalLength: fullText.length })}\n\n`);
  } catch (err: any) {
    res.write(`data: ${JSON.stringify({ type: "error", message: err.message })}\n\n`);
  }

  res.end();
});

Step 2: Batch Research Pipeline

import { Queue, Worker } from "bullmq";

const searchQueue = new Queue("perplexity-research", {
  connection: { host: "localhost", port: 6379 },
});

async function submitResearchBatch(
  queries: string[],
  callbackUrl: string,
  model: string = "sonar-pro"
) {
  const batchId = crypto.randomUUID();

  for (const query of queries) {
    await searchQueue.add("search", { batchId, query, callbackUrl, model }, {
      attempts: 3,
      backoff: { type: "exponential", delay: 2000 },
    });
  }

  return { batchId, totalQueries: queries.length };
}

const worker = new Worker("perplexity-research", async (job) => {
  const { query, callbackUrl, batchId, model } = job.data;

  const response = await perplexity.chat.completions.create({
    model,
    messages: [{ role: "user", content: query }],
    max_tokens: 2048,
  });

  const result = {
    event: "perplexity.search.completed",
    batchId,
    query,
    answer: response.choices[0].message.content,
    citations: (response as any).citations || [],
    model: response.model,
    tokens: response.usage?.total_tokens,
  };

  // Deliver result via callback
  await fetch(callbackUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(result),
  });
}, {
  connection: { host: "localhost", port: 6379 },
  concurrency: 3,  // Stay within rate limits
  limiter: { max: 40, duration: 60000 },  // 40 RPM safety margin
});

Step 3: Scheduled News Monitor

// Run via cron: every 6 hours
async function monitorTopics(
  topics: string[],
  webhookUrl: string
) {
  for (const topic of topics) {
    const response = await perplexity.chat.completions.create({
      model: "sonar",
      messages: [{
        role: "system",
        content: "Summarize the latest developments. Be concise. Include only new information.",
      }, {
        role: "user",
        content: `Latest developments about "${topic}" in the past 24 hours`,
      }],
      search_recency_filter: "day",
      max_tokens: 500,
    } as any);

    const answer = response.choices[0].message.content || "";
    const citations = (response as any).citations || [];

    // Only notify if there are actual developments
    if (citations.length > 0 && answer.length > 100) {
      await fetch(webhookUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          event: "perplexity.monitor.update",
          topic,
          summary: answer,
          citations,
          timestamp: new Date().toISOString(),
        }),
      });
    }

    // Rate limit protection
    await new Promise((r) => setTimeout(r, 2000));
  }
}

Step 4: Client-Side SSE Consumer

// Browser client consuming the streaming endpoint
function consumeSearchStream(
  query: string,
  onText: (text: string) => void,
  onCitations: (urls: string[]) => void,
  onDone: () => void
) {
  fetch("/api/search/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ query }),
  }).then(async (response) => {
    const reader = response.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const lines = decoder.decode(value).split("\n");
      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        const event = JSON.parse(line.slice(6));

        if (event.type === "text") onText(event.content);
        if (event.type === "citations") onCitations(event.urls);
        if (event.type === "done") onDone();
      }
    }
  });
}

Error Handling

| Issue | Cause | Solution | |-------|-------|----------| | Stream stalls | Complex search taking too long | Set per-chunk timeout (10s) | | 429 in batch | Too many concurrent workers | Reduce concurrency, add rate limiter | | Empty monitor alerts | Topic too niche | Broaden topic or reduce recency filter | | Callback fails | Webhook URL down | Retry with exponential backoff |

Output

  • Streaming SSE endpoint for real-time search
  • Batch research pipeline with queue-based processing
  • Scheduled news monitoring with alerting
  • Client-side stream consumer

Resources

Next Steps

For deployment setup, see perplexity-deploy-integration.