Agent Skills: Clari Deploy Integration

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/clari-deploy-integration

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/clari-pack/skills/clari-deploy-integration

Skill Files

Browse the full folder contents for clari-deploy-integration.

Download Skill

Loading file tree…

plugins/saas-packs/clari-pack/skills/clari-deploy-integration/SKILL.md

Skill Metadata

Name
clari-deploy-integration
Description
|

Clari Deploy Integration

Overview

Deploy Clari export pipelines to production environments: Airflow DAGs, AWS Lambda, or Google Cloud Functions for scheduled, serverless execution.

Instructions

Airflow DAG

# dags/clari_export_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

def export_clari_forecast(**context):
    from clari_client import ClariClient, ClariConfig

    client = ClariClient(ClariConfig(
        api_key=Variable.get("clari_api_key"),
    ))

    period = context["params"].get("period", "2026_Q1")
    data = client.export_and_download("company_forecast", period)

    entries = data.get("entries", [])
    context["ti"].xcom_push(key="entry_count", value=len(entries))
    # Load to warehouse here

dag = DAG(
    "clari_daily_export",
    schedule_interval="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
)

export_task = PythonOperator(
    task_id="export_forecast",
    python_callable=export_clari_forecast,
    dag=dag,
)

AWS Lambda

# lambda_handler.py
import json
import boto3
from clari_client import ClariClient, ClariConfig

def handler(event, context):
    ssm = boto3.client("ssm")
    api_key = ssm.get_parameter(
        Name="/clari/api-key", WithDecryption=True
    )["Parameter"]["Value"]

    client = ClariClient(ClariConfig(api_key=api_key))
    data = client.export_and_download(
        event.get("forecast_name", "company_forecast"),
        event.get("period", "2026_Q1"),
    )

    return {
        "statusCode": 200,
        "body": json.dumps({"entries": len(data.get("entries", []))}),
    }

Google Cloud Function

# main.py
import functions_framework
from google.cloud import secretmanager
from clari_client import ClariClient, ClariConfig

@functions_framework.http
def clari_export(request):
    sm = secretmanager.SecretManagerServiceClient()
    secret = sm.access_secret_version(name="projects/my-proj/secrets/clari-api-key/versions/latest")
    api_key = secret.payload.data.decode()

    client = ClariClient(ClariConfig(api_key=api_key))
    data = client.export_and_download("company_forecast", "2026_Q1")

    return {"entries": len(data.get("entries", []))}

Error Handling

| Issue | Cause | Solution | |-------|-------|----------| | Lambda timeout | Export takes > 15min | Use Step Functions for long jobs | | Secret not found | Wrong parameter path | Verify SSM/Secret Manager path | | Airflow task fails | Rate limited | Add retries with backoff |

Resources

Next Steps

For webhook setup, see clari-webhooks-events.