Clari Deploy Integration
Overview
Deploy Clari export pipelines to production environments: Airflow DAGs, AWS Lambda, or Google Cloud Functions for scheduled, serverless execution.
Instructions
Airflow DAG
# dags/clari_export_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
def export_clari_forecast(**context):
from clari_client import ClariClient, ClariConfig
client = ClariClient(ClariConfig(
api_key=Variable.get("clari_api_key"),
))
period = context["params"].get("period", "2026_Q1")
data = client.export_and_download("company_forecast", period)
entries = data.get("entries", [])
context["ti"].xcom_push(key="entry_count", value=len(entries))
# Load to warehouse here
dag = DAG(
"clari_daily_export",
schedule_interval="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
)
export_task = PythonOperator(
task_id="export_forecast",
python_callable=export_clari_forecast,
dag=dag,
)
AWS Lambda
# lambda_handler.py
import json
import boto3
from clari_client import ClariClient, ClariConfig
def handler(event, context):
ssm = boto3.client("ssm")
api_key = ssm.get_parameter(
Name="/clari/api-key", WithDecryption=True
)["Parameter"]["Value"]
client = ClariClient(ClariConfig(api_key=api_key))
data = client.export_and_download(
event.get("forecast_name", "company_forecast"),
event.get("period", "2026_Q1"),
)
return {
"statusCode": 200,
"body": json.dumps({"entries": len(data.get("entries", []))}),
}
Google Cloud Function
# main.py
import functions_framework
from google.cloud import secretmanager
from clari_client import ClariClient, ClariConfig
@functions_framework.http
def clari_export(request):
sm = secretmanager.SecretManagerServiceClient()
secret = sm.access_secret_version(name="projects/my-proj/secrets/clari-api-key/versions/latest")
api_key = secret.payload.data.decode()
client = ClariClient(ClariConfig(api_key=api_key))
data = client.export_and_download("company_forecast", "2026_Q1")
return {"entries": len(data.get("entries", []))}
Error Handling
| Issue | Cause | Solution | |-------|-------|----------| | Lambda timeout | Export takes > 15min | Use Step Functions for long jobs | | Secret not found | Wrong parameter path | Verify SSM/Secret Manager path | | Airflow task fails | Rate limited | Add retries with backoff |
Resources
Next Steps
For webhook setup, see clari-webhooks-events.