Duroxide Durable Workflow Development
Overview
Skills for developing durable workflows using Duroxide in Rust. Duroxide provides deterministic, replayable orchestrations with automatic failure recovery.
Core Concepts
- Activities: Idempotent operations that perform actual work (K8s calls, DB queries, HTTP requests)
- Orchestrations: Deterministic workflow logic that coordinates activities
- Continue-as-new: Pattern for long-running orchestrations to prevent unbounded history
- Sub-orchestrations: Reusable workflow compositions
- Detached orchestrations: Background workflows that run independently
Directory Structure
toygres-orchestrations/src/
├── orchestrations/ # Workflow definitions
│ └── my_orchestration.rs
├── activities/ # Atomic operations
│ ├── my_activity.rs
│ └── cms/ # Grouped by domain
│ └── my_cms_activity.rs
├── registry.rs # Central registration
├── types.rs # Orchestration I/O types
├── activity_types.rs # Activity I/O types
└── names.rs # Naming constants
Naming Convention
Follow the hierarchical namespace pattern in names.rs:
// Format: {crate}::{type}::{name}
pub mod orchestrations {
pub const MY_WORKFLOW: &str = "toygres-orchestrations::orchestration::my-workflow";
}
pub mod activities {
pub const MY_ACTIVITY: &str = "toygres-orchestrations::activity::my-activity";
}
Creating Activities
Activities must be idempotent - safe to retry without side effects.
// toygres-orchestrations/src/activities/my_activity.rs
use duroxide::ActivityContext;
use crate::activity_types::{MyInput, MyOutput};
/// Activity name for registration and scheduling
pub const NAME: &str = "toygres-orchestrations::activity::my-activity";
pub async fn activity(
ctx: ActivityContext,
input: MyInput,
) -> Result<MyOutput, String> {
ctx.trace_info(format!("Starting activity: {}", input.name));
// CRITICAL: Check idempotency first - has this already been done?
let already_done = check_if_done(&input).await?;
if already_done {
ctx.trace_info("Already completed, returning cached result");
return Ok(MyOutput { ... });
}
// Perform actual work
let result = do_work(&input).await
.map_err(|e| format!("Failed: {}", e))?;
ctx.trace_info("Activity completed successfully");
Ok(result)
}
Activity Registration
Add to registry.rs:
pub fn create_activity_registry() -> ActivityRegistry {
ActivityRegistry::builder()
.register_typed(
activities::my_activity::NAME,
activities::my_activity::activity,
)
.build()
}
Creating Orchestrations
Orchestrations coordinate activities with deterministic logic.
// toygres-orchestrations/src/orchestrations/my_orchestration.rs
use duroxide::{OrchestrationContext, RetryPolicy, BackoffStrategy};
use std::time::Duration;
pub async fn my_orchestration(
ctx: OrchestrationContext,
input: MyInput,
) -> Result<MyOutput, String> {
ctx.trace_info(format!("Starting orchestration: {}", input.id));
// Schedule an activity (basic)
let result = ctx
.schedule_activity_typed::<ActivityInput, ActivityOutput>(
activities::my_activity::NAME,
&activity_input,
)
.await?;
Ok(MyOutput { ... })
}
Orchestration Registration
pub fn create_orchestration_registry() -> OrchestrationRegistry {
OrchestrationRegistry::builder()
.register_typed(
orchestrations::MY_WORKFLOW,
crate::orchestrations::my_orchestration::my_orchestration,
)
.build()
}
Scheduling Patterns
Basic Activity Scheduling
let result = ctx
.schedule_activity_typed::<Input, Output>(NAME, &input)
.await?;
Activity with Retry and Backoff
let result = ctx
.schedule_activity_with_retry_typed::<Input, Output>(
NAME,
&input,
RetryPolicy::new(5) // Max 5 retries
.with_backoff(BackoffStrategy::Exponential {
base: Duration::from_secs(2),
multiplier: 2.0,
max: Duration::from_secs(30),
})
.with_timeout(Duration::from_secs(60)),
)
.await?;
Sub-Orchestration (Reusable Workflow)
let result = ctx
.schedule_sub_orchestration_typed::<Input, Output>(
orchestrations::CHILD_WORKFLOW,
&input,
)
.await?;
Detached Orchestration (Background/Fire-and-Forget)
// Start orchestration without waiting for completion
let input_json = serde_json::to_string(&input)?;
ctx.schedule_orchestration(
orchestrations::BACKGROUND_WORKFLOW,
&orchestration_id, // Unique ID for this instance
input_json,
);
// Continues immediately - orchestration runs independently
Deterministic Timing
NEVER use tokio::time::sleep() - it breaks determinism!
// Deterministic timer - safe for replay
ctx.schedule_timer(Duration::from_secs(30)).await;
// Get current time deterministically
let now = ctx.utc_now().await?;
Signal Handling with select2
Wait for either a timer or an external signal:
// Wait for 30 seconds OR deletion signal (whichever comes first)
let timer = ctx.schedule_timer(Duration::from_secs(30));
let deletion_signal = ctx.schedule_wait("InstanceDeleted");
let (winner_index, _) = ctx.select2(timer, deletion_signal).await;
if winner_index == 1 {
// Signal received
ctx.trace_info("Received signal, exiting gracefully");
return Ok(());
}
// Timer fired, continue
Continue-as-New Pattern
For long-running orchestrations, prevent unbounded history growth:
pub async fn long_running_orchestration(
ctx: OrchestrationContext,
input: MyInput,
) -> Result<MyOutput, String> {
// Do one iteration of work
let result = do_work(&ctx, &input).await?;
// Wait before next iteration
ctx.schedule_timer(Duration::from_secs(60)).await;
// Continue as new: restarts with fresh execution history
let next_input = MyInput {
iteration: input.iteration + 1,
..input
};
let input_json = serde_json::to_string(&next_input)?;
ctx.continue_as_new(input_json).await?;
Ok(result)
}
Error Handling Patterns
Propagate Critical Errors
// Fail orchestration if activity fails
let result = ctx.schedule_activity_typed::<I, O>(NAME, &input).await?;
Best-Effort Operations (Log and Continue)
// Don't fail orchestration for non-critical operations
if let Err(err) = ctx.schedule_activity_typed::<I, O>(NAME, &input).await {
ctx.trace_warn(format!("Non-critical operation failed: {}", err));
// Continue despite error
}
Versioning Strategy
Versioning is critical and detailed enough to warrant its own skill.
See: .agents/skills/duroxide-orchestration-versioning/SKILL.md
Logging
Use context logging methods for durability:
ctx.trace_info(format!("Processing: {}", id));
ctx.trace_warn(format!("Warning: {}", message));
ctx.trace_error(format!("Error: {}", error));
Best Practices Summary
- Naming: Use
{crate}::{type}::{name}format innames.rs - Idempotency: Activities must be safe to retry
- Determinism: Only use
ctx.schedule_timer(), nevertokio::time::sleep() - Versioning: Never modify existing orchestrations - create new versions
- Error Handling: Propagate critical errors, log non-critical ones
- Long-Running: Use continue-as-new to prevent history bloat
- Testing: Add serialization round-trip tests for all types
- Logging: Use
ctx.trace_*()methods, notprintln! - Composition: Use sub-orchestrations for reusable workflows
- Background Tasks: Use detached orchestrations for fire-and-forget