Airflow Operations
Use af commands to query, manage, and troubleshoot Airflow workflows.
Running the CLI
Run all af commands using uvx (no installation required):
uvx --from astro-airflow-mcp af <command>
Throughout this document, af is shorthand for uvx --from astro-airflow-mcp af.
Instance Configuration
Manage multiple Airflow instances with persistent configuration:
# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
# List and switch instances
af instance list # Shows all instances in a table
af instance use prod # Switch to prod instance
af instance current # Show current instance
af instance delete old-instance
# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run # Preview all discoverable instances
af instance discover # Discover from all backends (astro, local)
af instance discover astro # Discover Astro deployments only
af instance discover astro --all-workspaces # Include all accessible workspaces
af instance discover local # Scan common local Airflow ports
af instance discover local --scan # Deep scan all ports 1024-65535
# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.
# Override instance for a single command
af --instance staging dags list
Config file: ~/.af/config.yaml (override with --config or AF_CONFIG env var)
Tokens in config can reference environment variables using ${VAR} syntax:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
Or use environment variables directly (no config file needed):
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
Quick Reference
| Command | Description |
|---------|-------------|
| af health | System health check |
| af dags list | List all DAGs |
| af dags get <dag_id> | Get DAG details |
| af dags explore <dag_id> | Full DAG investigation |
| af dags source <dag_id> | Get DAG source code |
| af dags pause <dag_id> | Pause DAG scheduling |
| af dags unpause <dag_id> | Resume DAG scheduling |
| af dags errors | List import errors |
| af dags warnings | List DAG warnings |
| af dags stats | DAG run statistics |
| af runs list | List DAG runs |
| af runs get <dag_id> <run_id> | Get run details |
| af runs trigger <dag_id> | Trigger a DAG run |
| af runs trigger-wait <dag_id> | Trigger and wait for completion |
| af runs delete <dag_id> <run_id> | Permanently delete a DAG run |
| af runs clear <dag_id> <run_id> | Clear a run for re-execution |
| af runs diagnose <dag_id> <run_id> | Diagnose failed run |
| af tasks list <dag_id> | List tasks in DAG |
| af tasks get <dag_id> <task_id> | Get task definition |
| af tasks instance <dag_id> <run_id> <task_id> | Get task instance |
| af tasks logs <dag_id> <run_id> <task_id> | Get task logs |
| af config version | Airflow version |
| af config show | Full configuration |
| af config connections | List connections |
| af config variables | List variables |
| af config variable <key> | Get specific variable |
| af config pools | List pools |
| af config pool <name> | Get pool details |
| af config plugins | List plugins |
| af config providers | List providers |
| af config assets | List assets/datasets |
| af api <endpoint> | Direct REST API access |
| af api ls | List available API endpoints |
| af api ls --filter X | List endpoints matching pattern |
User Intent Patterns
DAG Operations
- "What DAGs exist?" / "List all DAGs" ->
af dags list - "Tell me about DAG X" / "What is DAG Y?" ->
af dags explore <dag_id> - "What's the schedule for DAG X?" ->
af dags get <dag_id> - "Show me the code for DAG X" ->
af dags source <dag_id> - "Stop DAG X" / "Pause this workflow" ->
af dags pause <dag_id> - "Resume DAG X" ->
af dags unpause <dag_id> - "Are there any DAG errors?" ->
af dags errors - "Create a new DAG" / "Write a pipeline" -> use the authoring-dags skill
Run Operations
- "What runs have executed?" ->
af runs list - "Run DAG X" / "Trigger the pipeline" ->
af runs trigger <dag_id> - "Run DAG X and wait" ->
af runs trigger-wait <dag_id> - "Why did this run fail?" ->
af runs diagnose <dag_id> <run_id> - "Delete this run" / "Remove stuck run" ->
af runs delete <dag_id> <run_id> - "Clear this run" / "Retry this run" / "Re-run this" ->
af runs clear <dag_id> <run_id> - "Test this DAG and fix if it fails" -> use the testing-dags skill
Task Operations
- "What tasks are in DAG X?" ->
af tasks list <dag_id> - "Get task logs" / "Why did task fail?" ->
af tasks logs <dag_id> <run_id> <task_id> - "Full root cause analysis" / "Diagnose and fix" -> use the debugging-dags skill
Data Operations
- "Is the data fresh?" / "When was this table last updated?" -> use the checking-freshness skill
- "Where does this data come from?" -> use the tracing-upstream-lineage skill
- "What depends on this table?" / "What breaks if I change this?" -> use the tracing-downstream-lineage skill
System Operations
- "What version of Airflow?" ->
af config version - "What connections exist?" ->
af config connections - "Are pools full?" ->
af config pools - "Is Airflow healthy?" ->
af health
API Exploration
- "What API endpoints are available?" ->
af api ls - "Find variable endpoints" ->
af api ls --filter variable - "Access XCom values" / "Get XCom" ->
af api xcom-entries -F dag_id=X -F task_id=Y - "Get event logs" / "Audit trail" ->
af api event-logs -F dag_id=X - "Create connection via API" ->
af api connections -X POST --body '{...}' - "Create variable via API" ->
af api variables -X POST -F key=name -f value=val
Common Workflows
Investigate a Failed Run
# 1. List recent runs to find failure
af runs list --dag-id my_dag
# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
# 4. After fixing, clear the run to retry all tasks
af runs clear my_dag manual__2024-01-15T10:00:00+00:00
Morning Health Check
# 1. Overall system health
af health
# 2. Check for broken DAGs
af dags errors
# 3. Check pool utilization
af config pools
Understand a DAG
# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag
Check Why DAG Isn't Running
# Check if paused
af dags get my_dag
# Check for import errors
af dags errors
# Check recent runs
af runs list --dag-id my_dag
Trigger and Monitor
# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800
# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>
Output Format
All commands output JSON (except instance commands which use human-readable tables):
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
Use jq for filtering:
# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'
# Get DAG IDs only
af dags list | jq '.dags[].dag_id'
# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'
Task Logs Options
# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2
# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5
Direct API Access with af api
Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).
# Discover available endpoints
af api ls
af api ls --filter variable
# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
Field syntax: -F key=value auto-converts types, -f key=value keeps as string.
Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.
Related Skills
| Skill | Use when... | |-------|-------------| | authoring-dags | Creating or editing DAG files with best practices | | testing-dags | Iterative test -> debug -> fix -> retest cycles | | debugging-dags | Deep root cause analysis and failure diagnosis | | checking-freshness | Checking if data is up to date or stale | | tracing-upstream-lineage | Finding where data comes from | | tracing-downstream-lineage | Impact analysis -- what breaks if something changes | | migrating-airflow-2-to-3 | Upgrading DAGs from Airflow 2.x to 3.x | | managing-astro-local-env | Starting, stopping, or troubleshooting local Airflow | | setting-up-astro-project | Initializing a new Astro/Airflow project |