Agent Skills: Clari Performance Tuning

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/clari-performance-tuning

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/clari-pack/skills/clari-performance-tuning

Skill Files

Browse the full folder contents for clari-performance-tuning.

Download Skill

Loading file tree…

plugins/saas-packs/clari-pack/skills/clari-performance-tuning/SKILL.md

Skill Metadata

Name
clari-performance-tuning
Description
|

Clari Performance Tuning

Overview

Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.

Instructions

Parallel Multi-Period Export

from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_export(
    client,
    forecast_name: str,
    periods: list[str],
    max_workers: int = 3,
) -> dict[str, list[dict]]:
    results = {}

    def export_period(period: str) -> tuple[str, list[dict]]:
        data = client.export_and_download(forecast_name, period)
        return period, data.get("entries", [])

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(export_period, p): p for p in periods
        }
        for future in as_completed(futures):
            period, entries = future.result()
            results[period] = entries
            print(f"  {period}: {len(entries)} entries")

    return results

Cache Export Results

import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta

class ExportCache:
    def __init__(self, cache_dir: str = ".cache/clari", ttl_hours: int = 4):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.ttl = timedelta(hours=ttl_hours)

    def _key(self, forecast: str, period: str) -> str:
        return hashlib.md5(f"{forecast}:{period}".encode()).hexdigest()

    def get(self, forecast: str, period: str) -> list[dict] | None:
        path = self.cache_dir / f"{self._key(forecast, period)}.json"
        if not path.exists():
            return None
        meta = json.loads(path.read_text())
        cached_at = datetime.fromisoformat(meta["cached_at"])
        if datetime.utcnow() - cached_at > self.ttl:
            return None
        return meta["entries"]

    def set(self, forecast: str, period: str, entries: list[dict]):
        path = self.cache_dir / f"{self._key(forecast, period)}.json"
        path.write_text(json.dumps({
            "cached_at": datetime.utcnow().isoformat(),
            "entries": entries,
        }))

Incremental Load to Warehouse

-- Use MERGE for incremental updates instead of full reload
MERGE INTO clari_forecasts AS target
USING staging_clari AS source
ON target.owner_email = source.owner_email
   AND target.time_period = source.time_period
   AND target.forecast_name = source.forecast_name
WHEN MATCHED THEN UPDATE SET
    forecast_amount = source.forecast_amount,
    quota_amount = source.quota_amount,
    crm_total = source.crm_total,
    crm_closed = source.crm_closed,
    exported_at = source.exported_at
WHEN NOT MATCHED THEN INSERT VALUES (
    source.owner_name, source.owner_email, source.forecast_amount,
    source.quota_amount, source.crm_total, source.crm_closed,
    source.adjustment_amount, source.time_period,
    source.exported_at, source.forecast_name
);

Performance Benchmarks

| Optimization | Before | After | |--------------|--------|-------| | Sequential 4-period export | 2 min | 40s (parallel) | | Cache hit | 5-10s API call | <1ms | | Full table reload | 30s | 5s (MERGE) |

Resources

Next Steps

For cost optimization, see clari-cost-tuning.