Clari Performance Tuning
Overview
Optimize Clari export pipelines: reduce export times, cache forecast data, and parallelize multi-period exports.
Instructions
Parallel Multi-Period Export
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_export(
client,
forecast_name: str,
periods: list[str],
max_workers: int = 3,
) -> dict[str, list[dict]]:
results = {}
def export_period(period: str) -> tuple[str, list[dict]]:
data = client.export_and_download(forecast_name, period)
return period, data.get("entries", [])
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(export_period, p): p for p in periods
}
for future in as_completed(futures):
period, entries = future.result()
results[period] = entries
print(f" {period}: {len(entries)} entries")
return results
Cache Export Results
import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
class ExportCache:
def __init__(self, cache_dir: str = ".cache/clari", ttl_hours: int = 4):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, forecast: str, period: str) -> str:
return hashlib.md5(f"{forecast}:{period}".encode()).hexdigest()
def get(self, forecast: str, period: str) -> list[dict] | None:
path = self.cache_dir / f"{self._key(forecast, period)}.json"
if not path.exists():
return None
meta = json.loads(path.read_text())
cached_at = datetime.fromisoformat(meta["cached_at"])
if datetime.utcnow() - cached_at > self.ttl:
return None
return meta["entries"]
def set(self, forecast: str, period: str, entries: list[dict]):
path = self.cache_dir / f"{self._key(forecast, period)}.json"
path.write_text(json.dumps({
"cached_at": datetime.utcnow().isoformat(),
"entries": entries,
}))
Incremental Load to Warehouse
-- Use MERGE for incremental updates instead of full reload
MERGE INTO clari_forecasts AS target
USING staging_clari AS source
ON target.owner_email = source.owner_email
AND target.time_period = source.time_period
AND target.forecast_name = source.forecast_name
WHEN MATCHED THEN UPDATE SET
forecast_amount = source.forecast_amount,
quota_amount = source.quota_amount,
crm_total = source.crm_total,
crm_closed = source.crm_closed,
exported_at = source.exported_at
WHEN NOT MATCHED THEN INSERT VALUES (
source.owner_name, source.owner_email, source.forecast_amount,
source.quota_amount, source.crm_total, source.crm_closed,
source.adjustment_amount, source.time_period,
source.exported_at, source.forecast_name
);
Performance Benchmarks
| Optimization | Before | After | |--------------|--------|-------| | Sequential 4-period export | 2 min | 40s (parallel) | | Cache hit | 5-10s API call | <1ms | | Full table reload | 30s | 5s (MERGE) |
Resources
Next Steps
For cost optimization, see clari-cost-tuning.