Agent Skills: Dagster Local Skill

Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod.

UncategorizedID: alexismanuel/dotfiles/dagster-local

Install this agent skill to your local

pnpm dlx add-skill https://github.com/alexismanuel/dotfiles/tree/HEAD/.config/opencode/skills/dagster-local

Skill Files

Browse the full folder contents for dagster-local.

Download Skill

Loading file tree…

.config/opencode/skills/dagster-local/SKILL.md

Skill Metadata

Name
dagster-local
Description
Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod.

Dagster Local Skill

Programmatic interaction with Dagster via GraphQL API and Kubernetes for pod-level logs.

Quick Start

from scripts.dagster_client import DagsterClient

client = DagsterClient()  # defaults to http://localhost:8000/graphql

# List all assets
assets = client.list_assets()

# Get recent runs
runs = client.get_recent_runs(limit=10)

# Get logs for a specific run
logs = client.get_run_logs(run_id="abc123")

Configuration

client = DagsterClient(graphql_url="http://localhost:8000/graphql")

Available Operations

Query Operations

| Function | Purpose | |----------|---------| | list_repositories() | List all code locations/repositories | | list_jobs(repo_location, repo_name) | List jobs in a repository | | list_assets(repo_location, repo_name) | List assets in a repository | | get_recent_runs(limit) | Get recent run history | | get_run_info(run_id) | Get detailed run info and status | | get_run_logs(run_id) | Get event logs for a run | | get_asset_info(asset_key) | Get asset details and dependencies |

Mutation Operations

| Function | Purpose | |----------|---------| | launch_job(repo_location, repo_name, job_name, config) | Launch a job run | | materialize_asset(asset_key, repo_location, repo_name) | Materialize an asset | | terminate_run(run_id) | Terminate an in-progress run |

Kubernetes Integration

When Dagster runs on K8s (each run = separate pod):

from scripts.k8s_logs import get_pod_logs_for_run, get_dagster_pods

# Get pod logs for a specific run
logs = get_pod_logs_for_run(run_id="abc123", namespace="dagster")

# List all Dagster-related pods
pods = get_dagster_pods(namespace="dagster")

Debugging Workflow

  1. Check recent runs: get_recent_runs() to find failed runs
  2. Get run details: get_run_info(run_id) for status and error summary
  3. Get Dagster logs: get_run_logs(run_id) for step-level events
  4. Get pod logs (K8s): get_pod_logs_for_run(run_id) for stdout/stderr

Common Patterns

Wait for Run Completion

import time

def wait_for_run(client, run_id, timeout=300):
    start = time.time()
    while time.time() - start < timeout:
        info = client.get_run_info(run_id)
        status = info.get("status")
        if status in ["SUCCESS", "FAILURE", "CANCELED"]:
            return info
        time.sleep(5)
    raise TimeoutError(f"Run {run_id} did not complete")

Get Failure Reason

def get_failure_reason(client, run_id):
    logs = client.get_run_logs(run_id)
    failures = [e for e in logs.get("events", []) if "error" in e]
    return failures[-1] if failures else None

GraphQL Reference

For advanced queries, see references/graphql_queries.md.