Agent Skills: Run tests with actor system

Actor model expert covering message passing, state isolation, supervision trees, deadlock prevention, fault tolerance, Actix framework, and Erlang-style concurrency patterns.

UncategorizedID: huiali/rust-skills/rust-actor

Install this agent skill to your local

pnpm dlx add-skill https://github.com/huiali/rust-skills/tree/HEAD/.codex/skills/rust-actor

Skill Files

Browse the full folder contents for rust-actor.

Download Skill

Loading file tree…

.codex/skills/rust-actor/SKILL.md

Skill Metadata

Name
rust-actor
Description
Actor model expert covering message passing, state isolation, supervision trees, deadlock prevention, fault tolerance, Actix framework, and Erlang-style concurrency patterns.

Solution Patterns

Pattern 1: Basic Actor Implementation

use tokio::sync::mpsc::{channel, Sender, Receiver};
use std::collections::HashMap;

// Actor trait
trait Actor: Send + 'static {
    type Message: Send + 'static;
    type Error: std::error::Error;

    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message);
}

// Actor context
struct Context<A: Actor> {
    mailbox: Receiver<A::Message>,
    sender: Sender<A::Message>,
    state: ActorState,
    supervisor: Option<SupervisorAddr>,
}

#[derive(Debug, Clone)]
enum ActorState {
    Starting,
    Running,
    Restarting,
    Stopping,
    Stopped,
}

// Address handle for sending messages
#[derive(Clone)]
struct Addr<A: Actor> {
    sender: Sender<A::Message>,
}

impl<A: Actor> Addr<A> {
    pub async fn send(&self, msg: A::Message) -> Result<(), SendError> {
        self.sender.send(msg).await
            .map_err(|_| SendError::Disconnected)
    }
}

// Example actor
struct CounterActor {
    count: usize,
}

#[derive(Debug)]
enum CounterMessage {
    Increment,
    Decrement,
    GetCount(Sender<usize>),
}

impl Actor for CounterActor {
    type Message = CounterMessage;
    type Error = std::io::Error;

    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message) {
        match msg {
            CounterMessage::Increment => {
                self.count += 1;
            }
            CounterMessage::Decrement => {
                self.count = self.count.saturating_sub(1);
            }
            CounterMessage::GetCount(reply) => {
                let _ = reply.try_send(self.count);
            }
        }
    }
}

Pattern 2: Request-Response Pattern

use tokio::sync::oneshot;
use std::time::Duration;

// Request wrapper with response channel
struct Request<M, R> {
    payload: M,
    response: oneshot::Sender<R>,
}

// Synchronous request with timeout
async fn request<A: Actor, R>(
    actor: &Addr<A>,
    msg: A::Message,
    timeout: Duration,
) -> Result<R, RequestError> {
    let (tx, rx) = oneshot::channel();

    let request = Request {
        payload: msg,
        response: tx,
    };

    actor.send(request).await
        .map_err(|_| RequestError::SendFailed)?;

    tokio::time::timeout(timeout, rx).await
        .map_err(|_| RequestError::Timeout)?
        .map_err(|_| RequestError::Canceled)
}

// Usage example
async fn example_request_response() {
    let (tx, rx) = oneshot::channel();

    let addr = counter_actor.start();
    addr.send(CounterMessage::GetCount(tx)).await.unwrap();

    let count = rx.await.unwrap();
    println!("Count: {}", count);
}

Pattern 3: Supervision Tree

use std::collections::HashMap;

#[derive(Debug, Clone)]
enum SupervisionStrategy {
    OneForOne,    // Only restart failed child
    AllForOne,    // Restart all children if one fails
    RestForOne,   // Restart failed child and all after it
}

struct Supervisor {
    children: HashMap<ChildId, Child>,
    strategy: SupervisionStrategy,
    max_restarts: u32,
    window: Duration,
}

struct Child {
    id: ChildId,
    addr: Box<dyn std::any::Any + Send>,
    restart_count: u32,
    last_restart: Option<Instant>,
    spec: ChildSpec,
}

struct ChildSpec {
    factory: Box<dyn Fn() -> Box<dyn std::any::Any + Send>>,
    restart_strategy: RestartStrategy,
}

#[derive(Debug, Clone)]
enum RestartStrategy {
    Permanent,   // Always restart
    Temporary,   // Never restart
    Transient,   // Restart only on abnormal exit
}

impl Supervisor {
    fn new(strategy: SupervisionStrategy, max_restarts: u32, window: Duration) -> Self {
        Self {
            children: HashMap::new(),
            strategy,
            max_restarts,
            window,
        }
    }

    async fn handle_child_error(&mut self, child_id: ChildId, error: &dyn std::error::Error) {
        log::warn!("Child {} failed: {}", child_id, error);

        match self.strategy {
            SupervisionStrategy::OneForOne => {
                self.restart_child(child_id).await;
            }
            SupervisionStrategy::AllForOne => {
                for id in self.children.keys().cloned().collect::<Vec<_>>() {
                    self.stop_child(id).await;
                }
                for id in self.children.keys().cloned().collect::<Vec<_>>() {
                    self.restart_child(id).await;
                }
            }
            SupervisionStrategy::RestForOne => {
                let ids: Vec<_> = self.children.keys()
                    .filter(|&&id| id >= child_id)
                    .cloned()
                    .collect();

                for id in ids {
                    self.stop_child(id).await;
                    self.restart_child(id).await;
                }
            }
        }
    }

    async fn restart_child(&mut self, child_id: ChildId) -> bool {
        if let Some(child) = self.children.get_mut(&child_id) {
            child.restart_count += 1;

            // Check restart rate limit
            if self.should_give_up(child) {
                log::error!("Child {} exceeded max restarts, giving up", child_id);
                self.stop_child(child_id).await;
                return false;
            }

            child.last_restart = Some(Instant::now());
            log::info!("Restarting child {}", child_id);

            // Factory creates new instance
            let new_instance = (child.spec.factory)();
            child.addr = new_instance;

            true
        } else {
            false
        }
    }

    fn should_give_up(&self, child: &Child) -> bool {
        if child.restart_count > self.max_restarts {
            if let Some(last_restart) = child.last_restart {
                if last_restart.elapsed() < self.window {
                    return true;
                }
            }
        }
        false
    }

    async fn stop_child(&mut self, child_id: ChildId) {
        if let Some(child) = self.children.remove(&child_id) {
            log::info!("Stopping child {}", child_id);
            // Send stop signal
        }
    }
}

Pattern 4: Deadlock Prevention with Bounded Mailboxes

use tokio::sync::mpsc;

struct BoundedMailbox<A: Actor> {
    receiver: mpsc::Receiver<A::Message>,
    sender: mpsc::Sender<A::Message>,
    capacity: usize,
}

impl<A: Actor> BoundedMailbox<A> {
    fn new(capacity: usize) -> Self {
        let (sender, receiver) = mpsc::channel(capacity);
        Self {
            receiver,
            sender,
            capacity,
        }
    }

    fn capacity(&self) -> usize {
        self.capacity
    }

    async fn send_with_backpressure(&self, msg: A::Message) -> Result<(), SendError> {
        // Will wait if mailbox is full (backpressure)
        self.sender.send(msg).await
            .map_err(|_| SendError::Disconnected)
    }

    fn try_send(&self, msg: A::Message) -> Result<(), TrySendError<A::Message>> {
        // Returns immediately if mailbox is full
        self.sender.try_send(msg)
            .map_err(|e| match e {
                mpsc::error::TrySendError::Full(msg) => TrySendError::Full(msg),
                mpsc::error::TrySendError::Closed(msg) => TrySendError::Disconnected(msg),
            })
    }
}

// Usage
async fn example_bounded_mailbox() {
    let mailbox: BoundedMailbox<CounterActor> = BoundedMailbox::new(100);

    // This will block if mailbox is full
    mailbox.send_with_backpressure(CounterMessage::Increment).await.unwrap();

    // This returns error immediately if full
    match mailbox.try_send(CounterMessage::Increment) {
        Ok(_) => println!("Sent"),
        Err(TrySendError::Full(_)) => println!("Mailbox full"),
        Err(TrySendError::Disconnected(_)) => println!("Actor stopped"),
    }
}

Pattern 5: Actor Lifecycle Management

trait LifecycleHandler: Actor {
    fn pre_start(&mut self, ctx: &mut Context<Self>) {
        // Initialize resources
        log::info!("Actor starting");
    }

    fn post_start(&mut self, ctx: &mut Context<Self>) {
        // Start timers, establish connections
        log::info!("Actor started");
    }

    fn pre_restart(&mut self, ctx: &mut Context<Self>, error: &dyn std::error::Error) {
        // Clean up resources before restart
        log::warn!("Actor restarting due to: {}", error);
    }

    fn post_restart(&mut self, ctx: &mut Context<Self>) {
        // Reinitialize after restart
        log::info!("Actor restarted");
    }

    fn post_stop(&mut self) {
        // Save state, close connections
        log::info!("Actor stopped");
    }
}

// Example with lifecycle hooks
struct DatabaseActor {
    connection: Option<DatabaseConnection>,
}

impl LifecycleHandler for DatabaseActor {
    fn pre_start(&mut self, ctx: &mut Context<Self>) {
        // Establish database connection
        self.connection = Some(DatabaseConnection::new());
    }

    fn pre_restart(&mut self, ctx: &mut Context<Self>, error: &dyn std::error::Error) {
        // Close existing connection
        if let Some(conn) = self.connection.take() {
            conn.close();
        }
    }

    fn post_stop(&mut self) {
        // Ensure connection is closed
        if let Some(conn) = self.connection.take() {
            conn.close();
        }
    }
}

Actor vs Thread Model

| Feature | Thread Model | Actor Model | |---------|-------------|-------------| | State sharing | Shared memory + locks | Isolated, message passing | | Deadlock risk | High (lock ordering) | Low (message queues) | | Scalability | Limited by thread count | Millions of actors possible | | Fault handling | Manual | Supervision trees | | Debugging | Hard (race conditions) | Easier (message sequence) | | Memory | Shared | Isolated per actor |

Workflow

Step 1: Design Actor Hierarchy

Design questions:
  → What state needs isolation? Each isolated state = 1 actor
  → What operations need sequential processing? Group in same actor
  → What can fail independently? Separate actors with supervision
  → What needs to scale? Use actor pool pattern

Step 2: Choose Messaging Pattern

Message patterns:
  → Fire-and-forget: Async send, no response
  → Request-response: Oneshot channel for reply
  → Streaming: Channel for multiple responses
  → Broadcast: Multiple recipients

Step 3: Set Up Supervision

Supervision strategy:
  → OneForOne: Independent actors (default choice)
  → AllForOne: Tightly coupled actors needing consistent state
  → RestForOne: Sequential dependencies

Restart policy:
  → Permanent: Critical actors (always restart)
  → Temporary: One-time tasks (never restart)
  → Transient: Restart on errors only

Review Checklist

When implementing actor systems:

  • [ ] Each actor has clear single responsibility
  • [ ] Mailboxes have bounded capacity (prevent memory leaks)
  • [ ] Message types are Send + 'static
  • [ ] No shared mutable state between actors
  • [ ] Supervision strategy appropriate for error handling
  • [ ] Actor lifecycle properly managed (cleanup in post_stop)
  • [ ] No circular message dependencies (deadlock risk)
  • [ ] Timeouts on request-response patterns
  • [ ] Monitoring tracks mailbox size and message latency
  • [ ] Backpressure handled when mailbox is full

Verification Commands

# Run tests with actor system
cargo test --test actor_tests

# Check for deadlocks with timeout
cargo test --test deadlock_tests -- --test-threads=1 --nocapture

# Profile actor message throughput
cargo bench --bench actor_bench

# Check memory usage under load
cargo run --release --bin load_test

# Monitor actor lifecycle events
RUST_LOG=debug cargo run

Common Pitfalls

1. Circular Message Dependencies (Deadlock)

Symptom: Actors waiting for each other's responses

// ❌ Bad: Actor A waits for Actor B, Actor B waits for Actor A
async fn actor_a_handler(&mut self, msg: Message) {
    let response = self.actor_b.request(msg).await;  // Blocks
    // Actor A is blocked, can't process Actor B's request
}

async fn actor_b_handler(&mut self, msg: Message) {
    let response = self.actor_a.request(msg).await;  // Blocks
    // Deadlock!
}

// ✅ Good: Use timeouts and avoid circular dependencies
async fn actor_a_handler(&mut self, msg: Message) {
    match tokio::time::timeout(
        Duration::from_secs(5),
        self.actor_b.request(msg)
    ).await {
        Ok(response) => { /* handle response */ }
        Err(_) => { /* timeout, handle error */ }
    }
}

// Better: redesign to avoid circular dependency

2. Unbounded Mailbox Growth

Symptom: Memory grows unbounded, OOM crashes

// ❌ Bad: unbounded channel
let (tx, rx) = mpsc::unbounded_channel();

// Slow consumer can't keep up, mailbox grows forever

// ✅ Good: bounded channel with backpressure
let (tx, rx) = mpsc::channel(100);  // Max 100 messages

// Sender will wait when mailbox is full (backpressure)
tx.send(msg).await?;

3. Blocking Operations in Actor

Symptom: Actor becomes unresponsive, messages pile up

// ❌ Bad: blocking I/O in actor
impl Actor for MyActor {
    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message) {
        // Blocks entire actor!
        let data = std::fs::read("file.txt").unwrap();
        // Other messages can't be processed
    }
}

// ✅ Good: use async I/O or spawn blocking task
impl Actor for MyActor {
    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message) {
        let addr = ctx.address();
        tokio::spawn(async move {
            // Runs in separate task
            let data = tokio::fs::read("file.txt").await.unwrap();
            addr.send(ProcessData(data)).await;
        });
        // Actor continues processing messages
    }
}

Actix Framework Example

use actix::{Actor, Handler, Message, Context};

struct MyActor {
    counter: usize,
}

impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("Actor started");
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        println!("Actor stopped");
    }
}

#[derive(Message)]
#[rtype(result = "usize")]
struct Increment;

impl Handler<Increment> for MyActor {
    type Result = usize;

    fn handle(&mut self, _msg: Increment, _ctx: &mut Self::Context) -> Self::Result {
        self.counter += 1;
        self.counter
    }
}

// Usage
#[actix_rt::main]
async fn main() {
    let actor = MyActor { counter: 0 }.start();
    let result = actor.send(Increment).await.unwrap();
    println!("Counter: {}", result);
}

Related Skills

  • rust-concurrency - Concurrency primitives and patterns
  • rust-async - Async message handling
  • rust-error - Error propagation in actor systems
  • rust-channel - Channel-based communication
  • rust-performance - Actor system optimization

Localized Reference

  • Chinese version: SKILL_ZH.md - 完整中文版本,包含所有内容