Agent Skills: Customer.io Load & Scale

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/customerio-load-scale

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/customerio-pack/skills/customerio-load-scale

Skill Files

Browse the full folder contents for customerio-load-scale.

Download Skill

Loading file tree…

plugins/saas-packs/customerio-pack/skills/customerio-load-scale/SKILL.md

Skill Metadata

Name
customerio-load-scale
Description
|

Customer.io Load & Scale

Overview

Load testing and scaling strategies for high-volume Customer.io integrations: k6 load test scripts, scaling architecture selection based on volume tier, Kubernetes HPA autoscaling, message queue buffering, and rate-limit-aware batch processing.

Scaling Architecture by Volume

| Daily Events | Architecture | Key Components | |-------------|--------------|----------------| | < 100K | Direct API | Singleton client, retry, connection pooling | | 100K - 1M | Batched API | Event queue, batch processor, rate limiter | | 1M - 10M | Queue-backed | Redis/Kafka queue, worker pool, backpressure | | > 10M | Distributed | Multiple workspaces, sharded queues, regional routing |

Customer.io rate limit is ~100 req/sec per workspace. Plan your architecture around this.

Instructions

Step 1: k6 Load Test Script

// load-tests/customerio.js
// Run: k6 run --vus 10 --duration 60s load-tests/customerio.js
import http from "k6/http";
import { check, sleep } from "k6";
import { Counter, Trend } from "k6/metrics";

const SITE_ID = __ENV.CUSTOMERIO_SITE_ID;
const API_KEY = __ENV.CUSTOMERIO_TRACK_API_KEY;
const BASE_URL = "https://track.customer.io/api/v1";
const AUTH = `${SITE_ID}:${API_KEY}`;

const identifyLatency = new Trend("cio_identify_latency");
const trackLatency = new Trend("cio_track_latency");
const errors = new Counter("cio_errors");

export const options = {
  scenarios: {
    identify_load: {
      executor: "ramping-arrival-rate",
      startRate: 10,
      timeUnit: "1s",
      preAllocatedVUs: 20,
      maxVUs: 50,
      stages: [
        { duration: "30s", target: 50 },   // Ramp to 50/sec
        { duration: "60s", target: 80 },   // Hold at 80/sec (near limit)
        { duration: "30s", target: 10 },   // Cool down
      ],
    },
  },
  thresholds: {
    cio_identify_latency: ["p(95)<500", "p(99)<2000"],
    cio_track_latency: ["p(95)<500", "p(99)<2000"],
    cio_errors: ["count<50"],
  },
};

export default function () {
  const userId = `k6-load-${__VU}-${__ITER}`;
  const headers = {
    "Content-Type": "application/json",
    Authorization: `Basic ${encoding.b64encode(AUTH)}`,
  };

  // Identify
  const identifyRes = http.put(
    `${BASE_URL}/customers/${userId}`,
    JSON.stringify({
      email: `${userId}@loadtest.example.com`,
      _load_test: true,
      created_at: Math.floor(Date.now() / 1000),
    }),
    { headers }
  );

  identifyLatency.add(identifyRes.timings.duration);
  check(identifyRes, { "identify 200": (r) => r.status === 200 }) || errors.add(1);

  // Track event
  const trackRes = http.post(
    `${BASE_URL}/customers/${userId}/events`,
    JSON.stringify({
      name: "load_test_event",
      data: { iteration: __ITER, vu: __VU },
    }),
    { headers }
  );

  trackLatency.add(trackRes.timings.duration);
  check(trackRes, { "track 200": (r) => r.status === 200 }) || errors.add(1);

  sleep(0.1); // Small delay between iterations
}

// Cleanup function — suppress test users after test
export function teardown() {
  console.log("Load test complete. Clean up k6-load-* users in CIO dashboard.");
}

Run:

k6 run --env CUSTOMERIO_SITE_ID="$CUSTOMERIO_SITE_ID" \
       --env CUSTOMERIO_TRACK_API_KEY="$CUSTOMERIO_TRACK_API_KEY" \
       load-tests/customerio.js

Step 2: Queue-Based Architecture

// services/cio-queue-worker.ts
import { Queue, Worker, QueueEvents } from "bullmq";
import { TrackClient, RegionUS } from "customerio-node";
import Bottleneck from "bottleneck";

const REDIS_URL = process.env.REDIS_URL ?? "redis://localhost:6379";

// Rate limiter: 80 requests per second (leave headroom under 100/sec limit)
const limiter = new Bottleneck({
  maxConcurrent: 15,
  reservoir: 80,
  reservoirRefreshAmount: 80,
  reservoirRefreshInterval: 1000,
});

const eventQueue = new Queue("cio:events", {
  connection: { url: REDIS_URL },
  defaultJobOptions: {
    attempts: 5,
    backoff: { type: "exponential", delay: 2000 },
    removeOnComplete: { count: 10000 },
    removeOnFail: { count: 50000 },
  },
});

// Producer — your application enqueues events here
export async function enqueueEvent(
  type: "identify" | "track",
  userId: string,
  data: Record<string, any>
): Promise<void> {
  await eventQueue.add(type, { userId, data, enqueuedAt: Date.now() });
}

// Consumer — workers process events with rate limiting
export function startEventWorkers(concurrency = 10): void {
  const cio = new TrackClient(
    process.env.CUSTOMERIO_SITE_ID!,
    process.env.CUSTOMERIO_TRACK_API_KEY!,
    { region: RegionUS }
  );

  const worker = new Worker(
    "cio:events",
    async (job) => {
      await limiter.schedule(async () => {
        if (job.name === "identify") {
          await cio.identify(job.data.userId, job.data.data);
        } else {
          await cio.track(job.data.userId, job.data.data);
        }
      });
    },
    {
      connection: { url: REDIS_URL },
      concurrency,
    }
  );

  worker.on("failed", (job, err) => {
    console.error(`CIO event failed: ${job?.id} — ${err.message}`);
  });

  // Monitor queue health
  const events = new QueueEvents("cio:events", {
    connection: { url: REDIS_URL },
  });

  setInterval(async () => {
    const counts = await eventQueue.getJobCounts();
    console.log(
      `CIO queue: waiting=${counts.waiting} active=${counts.active} ` +
      `failed=${counts.failed} completed=${counts.completed}`
    );
  }, 30000);
}

Step 3: Kubernetes HPA Autoscaling

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: cio-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: cio-event-worker
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Pods
      pods:
        metric:
          name: cio_queue_depth
        target:
          type: AverageValue
          averageValue: "500"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Pods
          value: 4
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Pods
          value: 2
          periodSeconds: 120

Step 4: Batch Sender for Bulk Operations

// lib/cio-batch-sender.ts
import { TrackClient, RegionUS } from "customerio-node";
import Bottleneck from "bottleneck";

export async function batchSend(
  operations: Array<{
    type: "identify" | "track";
    userId: string;
    data: Record<string, any>;
  }>,
  ratePerSec = 80
): Promise<{ succeeded: number; failed: number }> {
  const cio = new TrackClient(
    process.env.CUSTOMERIO_SITE_ID!,
    process.env.CUSTOMERIO_TRACK_API_KEY!,
    { region: RegionUS }
  );

  const limiter = new Bottleneck({
    maxConcurrent: 15,
    reservoir: ratePerSec,
    reservoirRefreshAmount: ratePerSec,
    reservoirRefreshInterval: 1000,
  });

  let succeeded = 0;
  let failed = 0;

  const promises = operations.map((op, i) =>
    limiter.schedule(async () => {
      try {
        if (op.type === "identify") {
          await cio.identify(op.userId, op.data);
        } else {
          await cio.track(op.userId, op.data);
        }
        succeeded++;
      } catch {
        failed++;
      }
      if ((succeeded + failed) % 1000 === 0) {
        console.log(`Progress: ${succeeded + failed}/${operations.length}`);
      }
    })
  );

  await Promise.all(promises);
  return { succeeded, failed };
}

Install: npm install bottleneck bullmq

Load Test Checklist

  • [ ] Test against staging workspace (NEVER production)
  • [ ] Start at 10% of target rate, ramp up gradually
  • [ ] Monitor 429 error rate during test
  • [ ] Check Customer.io dashboard for processing lag
  • [ ] Verify cleanup of test users after load test
  • [ ] Document baseline latency and throughput numbers
  • [ ] Set up alerts before running at production scale

Error Handling

| Issue | Solution | |-------|----------| | 429 during load test | Reduce rate, check limiter config | | Queue backlog growing | Scale workers, increase concurrency | | Memory pressure | Limit batch and queue sizes, enable GC | | k6 VU exhaustion | Increase preAllocatedVUs and maxVUs |

Resources

Next Steps

After load testing, proceed to customerio-known-pitfalls for anti-patterns to avoid.