Agent Skills: Python

MUST use when writing Python scripts.

UncategorizedID: windmill-labs/windmill/write-script-python3

Repository

windmill-labsLicense: NOASSERTION
16,266927

Install this agent skill to your local

pnpm dlx add-skill https://github.com/windmill-labs/windmill/tree/HEAD/system_prompts/auto-generated/skills/write-script-python3

Skill Files

Browse the full folder contents for write-script-python3.

Download Skill

Loading file tree…

system_prompts/auto-generated/skills/write-script-python3/SKILL.md

Skill Metadata

Name
write-script-python3
Description
MUST use when writing Python scripts.

CLI Commands

Place scripts in a folder. After writing, tell the user they can run:

  • wmill generate-metadata - Generate .script.yaml and .lock files
  • wmill sync push - Deploy to Windmill

Do NOT run these commands yourself. Instead, inform the user that they should run them.

Use wmill resource-type list --schema to discover available resource types.

Python

Structure

The script must contain at least one function called main:

def main(param1: str, param2: int):
    # Your code here
    return {"result": param1, "count": param2}

Do not call the main function. Libraries are installed automatically.

Resource Types

On Windmill, credentials and configuration are stored in resources and passed as parameters to main.

You need to redefine the type of the resources that are needed before the main function as TypedDict:

from typing import TypedDict

class postgresql(TypedDict):
    host: str
    port: int
    user: str
    password: str
    dbname: str

def main(db: postgresql):
    # db contains the database connection details
    pass

Important rules:

  • The resource type name must be IN LOWERCASE
  • Only include resource types if they are actually needed
  • If an import conflicts with a resource type name, rename the imported object, not the type name
  • Make sure to import TypedDict from typing if you're using it

Imports

Libraries are installed automatically. Do not show installation instructions.

import requests
import pandas as pd
from datetime import datetime

If an import name conflicts with a resource type:

# Wrong - don't rename the type
import stripe as stripe_lib
class stripe_type(TypedDict): ...

# Correct - rename the import
import stripe as stripe_sdk
class stripe(TypedDict):
    api_key: str

Windmill Client

Import the windmill client for platform interactions:

import wmill

See the SDK documentation for available methods.

Preprocessor Scripts

For preprocessor scripts, the function should be named preprocessor and receives an event parameter:

from typing import TypedDict, Literal, Any

class Event(TypedDict):
    kind: Literal["webhook", "http", "websocket", "kafka", "email", "nats", "postgres", "sqs", "mqtt", "gcp"]
    body: Any
    headers: dict[str, str]
    query: dict[str, str]

def preprocessor(event: Event):
    # Transform the event into flow input parameters
    return {
        "param1": event["body"]["field1"],
        "param2": event["query"]["id"]
    }

S3 Object Operations

Windmill provides built-in support for S3-compatible storage operations.

import wmill

# Load file content from S3
content: bytes = wmill.load_s3_file(s3object)

# Load file as stream reader
reader: BufferedReader = wmill.load_s3_file_reader(s3object)

# Write file to S3
result: S3Object = wmill.write_s3_file(
    s3object,           # Target path (or None to auto-generate)
    file_content,       # bytes or BufferedReader
    s3_resource_path,   # Optional: specific S3 resource
    content_type,       # Optional: MIME type
    content_disposition # Optional: Content-Disposition header
)

Python SDK (wmill)

Import: import wmill

def worker_has_internal_server() -> bool

def get_mocked_api() -> Optional[dict]

Get the HTTP client instance.

Returns:

Configured httpx.Client for API requests

def get_client() -> httpx.Client

Make an HTTP GET request to the Windmill API.

Args:

endpoint: API endpoint path

raise_for_status: Whether to raise an exception on HTTP errors

**kwargs: Additional arguments passed to httpx.get

Returns:

HTTP response object

def get(endpoint, raise_for_status = True, **kwargs) -> httpx.Response

Make an HTTP POST request to the Windmill API.

Args:

endpoint: API endpoint path

raise_for_status: Whether to raise an exception on HTTP errors

**kwargs: Additional arguments passed to httpx.post

Returns:

HTTP response object

def post(endpoint, raise_for_status = True, **kwargs) -> httpx.Response

Create a new authentication token.

Args:

duration: Token validity duration (default: 1 day)

Returns:

New authentication token string

def create_token(duration = dt.timedelta(days=1)) -> str

Create a script job and return its job id.

.. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.

def run_script_async(path: str = None, hash_: str = None, args: dict = None, scheduled_in_secs: int = None) -> str

Create a script job by path and return its job id.

def run_script_by_path_async(path: str, args: dict = None, scheduled_in_secs: int = None) -> str

Create a script job by hash and return its job id.

def run_script_by_hash_async(hash_: str, args: dict = None, scheduled_in_secs: int = None) -> str

Create a flow job and return its job id.

def run_flow_async(path: str, args: dict = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str

Run script synchronously and return its result.

.. deprecated:: Use run_script_by_path or run_script_by_hash instead.

def run_script(path: str = None, hash_: str = None, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any

Run script by path synchronously and return its result.

def run_script_by_path(path: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any

Run script by hash synchronously and return its result.

def run_script_by_hash(hash_: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any

Run a script on the current worker without creating a job.

On agent workers (no internal server), falls back to running a normal

preview job and waiting for the result.

def run_inline_script_preview(content: str, language: str, args: dict = None) -> Any

Wait for a job to complete and return its result.

Args:

job_id: ID of the job to wait for

timeout: Maximum time to wait (seconds or timedelta)

verbose: Enable verbose logging

cleanup: Register cleanup handler to cancel job on exit

assert_result_is_not_none: Raise exception if result is None

Returns:

Job result when completed

Raises:

TimeoutError: If timeout is reached

Exception: If job fails

def wait_job(job_id, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False)

Cancel a specific job by ID.

Args:

job_id: UUID of the job to cancel

reason: Optional reason for cancellation

Returns:

Response message from the cancel endpoint

def cancel_job(job_id: str, reason: str = None) -> str

Cancel currently running executions of the same script.

def cancel_running() -> dict

Get job details by ID.

Args:

job_id: UUID of the job

Returns:

Job details dictionary

def get_job(job_id: str) -> dict

Get the root job ID for a flow hierarchy.

Args:

job_id: Job ID (defaults to current WM_JOB_ID)

Returns:

Root job ID

def get_root_job_id(job_id: str | None = None) -> dict

Get an OIDC JWT token for authentication to external services.

Args:

audience: Token audience (e.g., "vault", "aws")

expires_in: Optional expiration time in seconds

Returns:

JWT token string

def get_id_token(audience: str, expires_in: int | None = None) -> str

Get the status of a job.

Args:

job_id: UUID of the job

Returns:

Job status: "RUNNING", "WAITING", or "COMPLETED"

def get_job_status(job_id: str) -> JobStatus

Get the result of a completed job.

Args:

job_id: UUID of the completed job

assert_result_is_not_none: Raise exception if result is None

Returns:

Job result

def get_result(job_id: str, assert_result_is_not_none: bool = True) -> Any

Get a variable value by path.

Args:

path: Variable path in Windmill

Returns:

Variable value as string

def get_variable(path: str) -> str

Set a variable value by path, creating it if it doesn't exist.

Args:

path: Variable path in Windmill

value: Variable value to set

is_secret: Whether the variable should be secret (default: False)

def set_variable(path: str, value: str, is_secret: bool = False) -> None

Get a resource value by path.

Args:

path: Resource path in Windmill

none_if_undefined: Return None instead of raising if not found

interpolated: if variables and resources are fully unrolled

Returns:

Resource value dictionary or None

def get_resource(path: str, none_if_undefined: bool = False, interpolated: bool = True) -> dict | None

Set a resource value by path, creating it if it doesn't exist.

Args:

value: Resource value to set

path: Resource path in Windmill

resource_type: Resource type for creation

def set_resource(value: Any, path: str, resource_type: str)

List resources from Windmill workspace.

Args:

resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")

page: Optional page number for pagination

per_page: Optional number of results per page

Returns:

List of resource dictionaries

def list_resources(resource_type: str = None, page: int = None, per_page: int = None) -> list[dict]

Set the workflow state.

Args:

value: State value to set

path: Optional state resource path override.

def set_state(value: Any, path: str | None = None) -> None

Get the workflow state.

Args:

path: Optional state resource path override.

Returns:

State value or None if not set

def get_state(path: str | None = None) -> Any

Set job progress percentage (0-99).

Args:

value: Progress percentage

job_id: Job ID (defaults to current WM_JOB_ID)

def set_progress(value: int, job_id: Optional[str] = None)

Get job progress percentage.

Args:

job_id: Job ID (defaults to current WM_JOB_ID)

Returns:

Progress value (0-100) or None if not set

def get_progress(job_id: Optional[str] = None) -> Any

Set the user state of a flow at a given key

def set_flow_user_state(key: str, value: Any) -> None

Get the user state of a flow at a given key

def get_flow_user_state(key: str) -> Any

Get the Windmill server version.

Returns:

Version string

def version()

Convenient helpers that takes an S3 resource as input and returns the settings necessary to

initiate an S3 connection from DuckDB

def get_duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings | None

Convenient helpers that takes an S3 resource as input and returns the settings necessary to

initiate an S3 connection from Polars

def get_polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings

Convenient helpers that takes an S3 resource as input and returns the settings necessary to

initiate an S3 connection using boto3

def get_boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings

Load a file from the workspace s3 bucket and returns its content as bytes.

'''python

from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt")

my_obj_content = client.load_s3_file(s3_obj)

file_content = my_obj_content.decode("utf-8")

'''

def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None) -> bytes

Load a file from the workspace s3 bucket and returns the bytes stream.

'''python

from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt")

with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:

print(file_reader.read())

'''

def load_s3_file_reader(s3object: S3Object | str, s3_resource_path: str | None) -> BufferedReader

Write a file to the workspace S3 bucket

'''python

from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt")

# for an in memory bytes array:

file_content = b'Hello Windmill!'

client.write_s3_file(s3_obj, file_content)

# for a file:

with open("my_file.txt", "rb") as my_file:

client.write_s3_file(s3_obj, my_file)

'''

def write_s3_file(s3object: S3Object | str | None, file_content: BufferedReader | bytes, s3_resource_path: str | None, content_type: str | None = None, content_disposition: str | None = None) -> S3Object

Permanently delete a file from the workspace S3 bucket.

'''python

from wmill import S3Object

s3_obj = S3Object(s3="/path/to/my_file.txt")

client.delete_s3_object(s3_obj)

'''

def delete_s3_object(s3object: S3Object | str, s3_resource_path: str | None = None) -> None

Sign S3 objects for use by anonymous users in public apps.

Args:

s3_objects: List of S3 objects to sign

Returns:

List of signed S3 objects

def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]

Sign a single S3 object for use by anonymous users in public apps.

Args:

s3_object: S3 object to sign

Returns:

Signed S3 object

def sign_s3_object(s3_object: S3Object | str) -> S3Object

Generate presigned public URLs for an array of S3 objects.

If an S3 object is not signed yet, it will be signed first.

Args:

s3_objects: List of S3 objects to sign

base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)

Returns:

List of signed public URLs

Example:

>>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]

>>> urls = client.get_presigned_s3_public_urls(s3_objs)

def get_presigned_s3_public_urls(s3_objects: list[S3Object | str], base_url: str | None = None) -> list[str]

Generate a presigned public URL for an S3 object.

If the S3 object is not signed yet, it will be signed first.

Args:

s3_object: S3 object to sign

base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)

Returns:

Signed public URL

Example:

>>> s3_obj = S3Object(s3="/path/to/file.txt")

>>> url = client.get_presigned_s3_public_url(s3_obj)

def get_presigned_s3_public_url(s3_object: S3Object | str, base_url: str | None = None) -> str

Get the current user information.

Returns:

User details dictionary

def whoami() -> dict

Get the current user information (alias for whoami).

Returns:

User details dictionary

def user() -> dict

Get the state resource path from environment.

Returns:

State path string

def state_path() -> str

Get the workflow state.

Returns:

State value or None if not set

def state() -> Any

Set the state in the shared folder using pickle

def set_shared_state_pickle(value: Any, path: str = 'state.pickle') -> None

Get the state in the shared folder using pickle

def get_shared_state_pickle(path: str = 'state.pickle') -> Any

Set the state in the shared folder using pickle

def set_shared_state(value: Any, path: str = 'state.json') -> None

Get the state in the shared folder using pickle

def get_shared_state(path: str = 'state.json') -> None

Get URLs needed for resuming a flow after suspension.

Args:

approver: Optional approver name

flow_level: If True, generate resume URLs for the parent flow instead of the

specific step. This allows pre-approvals that can be consumed by any later

suspend step in the same flow.

Returns:

Dictionary with approvalPage, resume, and cancel URLs

def get_resume_urls(approver: str = None, flow_level: bool = None) -> dict

Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.

[Enterprise Edition Only] To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.

Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form

:param slack_resource_path: The path to the Slack resource in Windmill.

:type slack_resource_path: str

:param channel_id: The Slack channel ID where the approval request will be sent.

:type channel_id: str

:param message: Optional custom message to include in the Slack approval request.

:type message: str, optional

:param approver: Optional user ID or name of the approver for the request.

:type approver: str, optional

:param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.

:type default_args_json: dict, optional

:param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.

:type dynamic_enums_json: dict, optional

:raises Exception: If the function is not called within a flow or flow preview.

:raises Exception: If the required flow job or flow step environment variables are not set.

:return: None

Usage Example:

>>> client.request_interactive_slack_approval(

... slack_resource_path="/u/alex/my_slack_resource",

... channel_id="admins-slack-channel",

... message="Please approve this request",

... approver="approver123",

... default_args_json={"key1": "value1", "key2": 42},

... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},

... )

Notes:

- This function must be executed within a Windmill flow or flow preview.

- The function checks for required environment variables (WM_FLOW_JOB_ID, WM_FLOW_STEP_ID) to ensure it is run in the appropriate context.

def request_interactive_slack_approval(slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None

Get email from workspace username

This method is particularly useful for apps that require the email address of the viewer.

Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.

def username_to_email(username: str) -> str

Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message

def send_teams_message(conversation_id: str, text: str, success: bool = True, card_block: dict = None)

Get a DataTable client for SQL queries.

Args:

name: Database name (default: "main")

Returns:

DataTableClient instance

def datatable(name: str = 'main')

Get a DuckLake client for DuckDB queries.

Args:

name: Database name (default: "main")

Returns:

DucklakeClient instance

def ducklake(name: str = 'main')

def init_global_client(f)

def deprecate(in_favor_of: str)

Get the current workspace ID.

Returns:

Workspace ID string

def get_workspace() -> str

def get_version() -> str

Run a script synchronously by hash and return its result.

Args:

hash: Script hash

args: Script arguments

verbose: Enable verbose logging

assert_result_is_not_none: Raise exception if result is None

cleanup: Register cleanup handler to cancel job on exit

timeout: Maximum time to wait

Returns:

Script result

def run_script_sync(hash: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any

Run a script synchronously by path and return its result.

Args:

path: Script path

args: Script arguments

verbose: Enable verbose logging

assert_result_is_not_none: Raise exception if result is None

cleanup: Register cleanup handler to cancel job on exit

timeout: Maximum time to wait

Returns:

Script result

def run_script_by_path_sync(path: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any

Convenient helpers that takes an S3 resource as input and returns the settings necessary to

initiate an S3 connection from DuckDB

def duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings

Convenient helpers that takes an S3 resource as input and returns the settings necessary to

initiate an S3 connection from Polars

def polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings

Convenient helpers that takes an S3 resource as input and returns the settings necessary to

initiate an S3 connection using boto3

def boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings

Get the state resource path from environment.

Returns:

State path string

def get_state_path() -> str

Parse resource syntax from string.

def parse_resource_syntax(s: str) -> Optional[str]

Parse S3 object from string or S3Object format.

def parse_s3_object(s3_object: S3Object | str) -> S3Object

Parse variable syntax from string.

def parse_variable_syntax(s: str) -> Optional[str]

Append a text to the result stream.

Args:

text: text to append to the result stream

def append_to_result_stream(text: str) -> None

Stream to the result stream.

Args:

stream: stream to stream to the result stream

def stream_result(stream) -> None

Execute a SQL query against the DataTable.

Args:

sql: SQL query string with $1, $2, etc. placeholders

*args: Positional arguments to bind to query placeholders

Returns:

SqlQuery instance for fetching results

def query(sql: str, *args) -> SqlQuery

Execute query and fetch results.

Args:

result_collection: Optional result collection mode

Returns:

Query results

def fetch(result_collection: str | None = None)

Execute query and fetch first row of results.

Returns:

First row of query results

def fetch_one()

Execute query and fetch first row of results. Return result as a scalar value.

Returns:

First row of query result as a scalar value

def fetch_one_scalar()

Execute query and don't return any results.

def execute()

DuckDB executor requires explicit argument types at declaration

These types exist in both DuckDB and Postgres

Check that the types exist if you plan to extend this function for other SQL engines.

def infer_sql_type(value) -> str

def parse_sql_client_name(name: str) -> tuple[str, Optional[str]]

Decorator that marks a function as a workflow task.

Works in both WAC v1 (sync, HTTP-based dispatch) and WAC v2

(async, checkpoint/replay) modes:

- v2 (inside @workflow): dispatches as a checkpoint step.

- v1 (WM_JOB_ID set, no @workflow): dispatches via HTTP API.

- Standalone: executes the function body directly.

Usage::

@task

async def extract_data(url: str): ...

@task(path="f/external_script", timeout=600, tag="gpu")

async def run_external(x: int): ...

def task(_func = None, path: Optional[str] = None, tag: Optional[str] = None, timeout: Optional[int] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)

Create a task that dispatches to a separate Windmill script.

Usage::

extract = task_script("f/data/extract", timeout=600)

@workflow

async def main():

data = await extract(url="https://...")

def task_script(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)

Create a task that dispatches to a separate Windmill flow.

Usage::

pipeline = task_flow("f/etl/pipeline", priority=10)

@workflow

async def main():

result = await pipeline(input=data)

def task_flow(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)

Decorator marking an async function as a workflow-as-code entry point.

The function must be deterministic: given the same inputs it must call

tasks in the same order on every replay. Branching on task results is fine

(results are replayed from checkpoint), but branching on external state

(current time, random values, external API calls) must use step() to

checkpoint the value so replays see the same result.

def workflow(func)

Execute fn inline and checkpoint the result.

On replay the cached value is returned without re-executing fn.

Use for lightweight deterministic operations (timestamps, random IDs,

config reads) that should not incur the overhead of a child job.

async def step(name: str, fn)

Server-side sleep — suspend the workflow for the given duration without holding a worker.

Inside a @workflow, the parent job suspends and auto-resumes after seconds.

Outside a workflow, falls back to asyncio.sleep.

async def sleep(seconds: int)

Suspend the workflow and wait for an external approval.

Use get_resume_urls() (wrapped in step()) to obtain

resume/cancel/approval URLs before calling this function.

Returns a dict with value (form data), approver, and approved.

Args:

timeout: Approval timeout in seconds (default 1800).

form: Optional form schema for the approval page.

self_approval: Whether the user who triggered the flow can approve it (default True).

Example::

urls = await step("urls", lambda: get_resume_urls())

await step("notify", lambda: send_email(urls["approvalPage"]))

result = await wait_for_approval(timeout=3600)

async def wait_for_approval(timeout: int = 1800, form: dict | None = None, self_approval: bool = True) -> dict

Process items in parallel with optional concurrency control.

Each item is processed by calling fn(item), which should be a @task.

Items are dispatched in batches of concurrency (default: all at once).

Example::

@task

async def process(item: str):

...

results = await parallel(items, process, concurrency=5)

async def parallel(items, fn, concurrency: Optional[int] = None)

Commit Kafka offsets for a trigger with auto_commit disabled.

Args:

trigger_path: Path to the Kafka trigger (from event['wm_trigger']['trigger_path'])

topic: Kafka topic name (from event['topic'])

partition: Partition number (from event['partition'])

offset: Message offset to commit (from event['offset'])

def commit_kafka_offsets(trigger_path: str, topic: str, partition: int, offset: int) -> None