Agent Skills: Multi-Agent Coordination Patterns

Common patterns and best practices for multi-agent coordination

UncategorizedID: awaaate/Hyrm/multi-agent-patterns

Install this agent skill to your local

pnpm dlx add-skill https://github.com/awaaate/Hyrm/tree/HEAD/.opencode/skill/multi-agent-patterns

Skill Files

Browse the full folder contents for multi-agent-patterns.

Download Skill

Loading file tree…

.opencode/skill/multi-agent-patterns/SKILL.md

Skill Metadata

Name
multi-agent-patterns
Description
Common patterns and best practices for multi-agent coordination

Multi-Agent Coordination Patterns

Essential patterns for effective multi-agent system coordination.

Decision Framework

Before coordinating agents, consider:

<scratchpad> 1. What is the task complexity? (Simple: do directly, Complex: delegate) 2. Can work be parallelized? (Independent subtasks = spawn multiple workers) 3. Are there dependencies? (Use task.depends_on, spawn sequentially) 4. What's the expected duration? (< 2min: do directly, > 2min: delegate) 5. Does it need specialized skills? (code-worker, analysis-worker, etc.) </scratchpad>

Agent Types

| Role | Handoff | Purpose | |------|---------|---------| | orchestrator | NEVER | System-wide coordination, task distribution | | code-worker | Yes | Implementation, bug fixes, refactoring | | analysis-worker | Yes | Research, investigation (read-only) | | memory-worker | Yes | Memory system maintenance | | critique | Yes | Code review, quality assessment | | monitor | Optional | System health, performance tracking |

Core Patterns

1. Parallel Task Distribution (Most Common)

When: Tasks are independent, can run concurrently.

# Spawn multiple workers in parallel (non-blocking)
nohup opencode run 'WORKER-1: agent_register. Task: Analyze module A. Report: agent_send(task_complete).' > /dev/null 2>&1 &
nohup opencode run 'WORKER-2: agent_register. Task: Analyze module B. Report: agent_send(task_complete).' > /dev/null 2>&1 &
nohup opencode run 'WORKER-3: agent_register. Task: Analyze module C. Report: agent_send(task_complete).' > /dev/null 2>&1 &

2. Sequential Chain (Dependencies)

When: Task B needs output from Task A.

// Use task dependencies
task_create({ title: "Task A", priority: "high" })
task_create({ 
  title: "Task B", 
  depends_on: ["task_A_id"]  // Auto-blocked until A completes
})

3. Supervisor Pattern (Fault Tolerance)

When: Long-running work needs monitoring.

// Orchestrator periodically checks workers
const agents = await agent_status()
const staleWorkers = agents.filter(a => 
  a.role !== 'orchestrator' && 
  Date.now() - new Date(a.last_heartbeat).getTime() > 120000
)
// Respawn stale workers
for (const w of staleWorkers) {
  if (w.current_task) {
    task_update({ task_id: w.current_task, status: "pending" })
    // Spawn replacement
  }
}

4. Fan-Out/Fan-In (Map-Reduce)

When: Need to aggregate results from multiple workers.

// Fan-out: Spawn workers
for (const chunk of dataChunks) {
  spawn(`WORKER: Process chunk ${chunk.id}. Report: agent_send(partial_result, {chunk_id, result})`)
}

// Fan-in: Collect results
const results = []
while (results.length < dataChunks.length) {
  const messages = await agent_messages()
  const partials = messages.filter(m => m.type === 'partial_result')
  results.push(...partials.map(m => m.payload))
}
const finalResult = aggregate(results)

Communication

Message Types

| Type | Use Case | Example | |------|----------|---------| | broadcast | Announcements to all | Status updates, alerts | | direct | To specific agent | Coordination, questions | | task_claim | Claiming task | Prevent race conditions | | task_complete | Report completion | Include summary, files changed | | task_available | New task ready | Broadcast for workers | | request_help | Stuck/blocked | Escalate to orchestrator |

Message Payloads

// Task completion (include everything needed for assessment)
agent_send({
  type: "task_complete",
  payload: {
    task_id: "task_123",
    summary: "Implemented error handling for API calls",
    files_changed: ["src/api.ts", "src/api.test.ts"],
    issues_found: [],
    recommendations: ["Consider adding retry logic"]
  }
})

// Request help (be specific about blocker)
agent_send({
  type: "request_help",
  payload: {
    task_id: "task_456",
    blocker: "Need database credentials",
    tried: ["Checked env vars", "Checked config files"],
    urgency: "high"
  }
})

TASK MANAGEMENT

1. Persistent Task Creation (via plugin tools)

// Create a simple task
task_create({
  title: "Analyze system architecture",
  description: "Review and document system components",
  priority: "high",
  tags: ["architecture", "documentation"]
})

// Create a task with dependencies
task_create({
  title: "Deploy to production",
  description: "Deploy after all tests pass",
  priority: "high",
  depends_on: ["task_123_abc", "task_456_def"] // Must complete first
})

2. Task Dependencies

Tasks can depend on other tasks. A task with dependencies:

  • Starts in "blocked" status if dependencies aren't complete
  • Automatically becomes "pending" when all dependencies complete
  • task_next() only returns tasks with satisfied dependencies
// Create a task chain
const setupTask = await task_create({ title: "Setup environment" })
const buildTask = await task_create({ 
  title: "Build project", 
  depends_on: [setupTask.id] 
})
const testTask = await task_create({ 
  title: "Run tests", 
  depends_on: [buildTask.id] 
})
// testTask will be blocked until buildTask completes
// buildTask will be blocked until setupTask completes

3. Task Claiming & Assignment

// Find next available task (respects dependencies)
const next = await task_next()

// Claim the task atomically (prevents conflicts)
if (next.task) {
  await task_claim({ task_id: next.task.id })
}

// Update task with notes
await task_update({
  task_id: "task_123",
  notes: "Progress: 50% complete"
})

4. Progress Tracking (via message bus)

// Update progress
agent_send({
  type: "broadcast",
  payload: {
    task_id: "task-001",
    progress: 0.75,
    eta_minutes: 10
  }
})

ERROR HANDLING

1. Graceful Failure

try {
  // Execute task
} catch (error) {
  agent_send({
    type: "task_complete",
    payload: {
      task_id: "...",
      status: "failed",
      error: error.message
    }
  })
}

2. Retry Logic

const MAX_RETRIES = 3
let attempt = 0
while (attempt < MAX_RETRIES) {
  try {
    // Try task
    break
  } catch (error) {
    attempt++
    if (attempt === MAX_RETRIES) {
      agent_send({ type: "request_help", payload: { ... } })
    }
  }
}

WORKER POOLS

1. Spawning Worker Pools

Spawn multiple workers for parallel task execution:

# Spawn a pool of 3 workers for code analysis
for i in 1 2 3; do
  opencode run "You are WORKER-$i. Call agent_register with role=code-analyzer. 
  Poll task_next() for available code-analysis tasks.
  Claim tasks with task_claim. 
  Report results via agent_send type=task_complete.
  You CAN handoff when no more tasks are available." &
done

2. Worker Lifecycle

1. INIT: agent_register with role
2. POLL: task_next() or agent_messages for task_available
3. CLAIM: task_claim(task_id) - atomic assignment
4. EXECUTE: Do the work
5. COMPLETE: task_update status=completed, agent_send task_complete
6. REPEAT or HANDOFF

3. Worker Roles

  • code-analyzer: Static analysis, code review
  • test-runner: Execute and report test results
  • doc-writer: Generate documentation
  • data-processor: ETL, data transformation

LOAD BALANCING

1. Check Agent Load

const agents = await agent_status()
const workers = agents.filter(a => a.role !== 'orchestrator')

// Find idle workers
const idle = workers.filter(a => a.status === 'idle')

// Find least loaded worker
const leastLoaded = workers
  .sort((a, b) => (a.current_task ? 1 : 0) - (b.current_task ? 1 : 0))[0]

2. Task Distribution Strategy

// Round-robin distribution
let workerIndex = 0
const assignTask = async (task) => {
  const workers = (await agent_status()).filter(a => a.role === 'worker')
  const target = workers[workerIndex % workers.length]
  workerIndex++
  
  await agent_send({
    type: "direct",
    to_agent: target.id,
    payload: { action: "work_on", task_id: task.id }
  })
}

3. Auto-scaling

// Scale up if too many pending tasks
const pending = await task_list({ status: "pending" })
const workers = (await agent_status()).filter(a => a.role === 'worker')

if (pending.length > workers.length * 2) {
  // Spawn more workers
  for (let i = 0; i < Math.min(3, pending.length - workers.length); i++) {
    await spawn_worker("generic-worker")
  }
}

AGENT COLLABORATION PATTERNS

1. Task Decomposition Pattern

Break complex tasks into smaller, parallelizable subtasks:

// Orchestrator creates parent task and subtasks
const parentTask = await task_create({
  title: "Implement user authentication system",
  priority: "high",
  complexity: "complex"
})

// Create subtasks with dependencies on parent
const subtasks = [
  { title: "Create User model and migrations", tags: ["database"] },
  { title: "Implement login/logout endpoints", tags: ["api"] },
  { title: "Add password hashing utility", tags: ["security"] },
  { title: "Create auth middleware", tags: ["api"] },
  { title: "Write unit tests for auth", tags: ["testing"] }
]

for (const sub of subtasks) {
  await task_create({
    ...sub,
    priority: "medium",
    description: `Subtask of: ${parentTask.task.title}`,
    tags: [...sub.tags, `parent:${parentTask.task.id}`]
  })
}

// Spawn specialized workers for each tag
await spawn_worker("database-worker", { filter: ["database"] })
await spawn_worker("api-worker", { filter: ["api"] })
await spawn_worker("testing-worker", { filter: ["testing"] })

2. Result Aggregation Pattern

Combine results from multiple workers into a unified output:

// Message structure for partial results
interface PartialResult {
  type: "partial_result"
  payload: {
    parent_task_id: string
    subtask_id: string
    result: any
    status: "success" | "failed"
  }
}

// Orchestrator collects and aggregates results
const collectResults = async (parentTaskId: string) => {
  const messages = await agent_messages()
  const partials = messages.filter(m => 
    m.type === "partial_result" && 
    m.payload.parent_task_id === parentTaskId
  )
  
  // Wait until all subtasks complete
  const subtasks = await task_list({ status: "all" })
  const childTasks = subtasks.filter(t => 
    t.tags?.includes(`parent:${parentTaskId}`)
  )
  
  const completed = childTasks.filter(t => t.status === "completed")
  if (completed.length === childTasks.length) {
    // Aggregate all results
    const aggregated = partials.map(p => p.payload.result)
    
    // Broadcast final aggregated result
    await agent_send({
      type: "task_complete",
      payload: {
        task_id: parentTaskId,
        result: { aggregated },
        subtasks_completed: completed.length
      }
    })
  }
}

3. Review Pattern (Agent Peer Review)

One agent reviews another's work before completion:

// Worker submits work for review
await agent_send({
  type: "review_request",
  payload: {
    task_id: "task_abc",
    work_product: {
      files_changed: ["src/auth.ts", "src/auth.test.ts"],
      summary: "Implemented JWT authentication"
    },
    requested_by: agentId
  }
})

// Update task to "pending_review" status
await task_update({
  task_id: "task_abc",
  status: "pending_review",
  notes: "Awaiting peer review"
})

// Reviewer agent picks up review request
const reviews = (await agent_messages())
  .filter(m => m.type === "review_request")

for (const review of reviews) {
  // Analyze the work product
  const issues = await reviewCode(review.payload.work_product)
  
  // Send review result
  await agent_send({
    type: "review_complete",
    to_agent: review.payload.requested_by,
    payload: {
      task_id: review.payload.task_id,
      approved: issues.length === 0,
      issues,
      reviewer: agentId
    }
  })
}

// Original worker handles review feedback
const reviewResult = (await agent_messages())
  .find(m => m.type === "review_complete" && m.payload.task_id === taskId)

if (reviewResult.payload.approved) {
  await task_update({ task_id: taskId, status: "completed" })
} else {
  // Address issues and resubmit
  await fixIssues(reviewResult.payload.issues)
  await agent_send({ type: "review_request", ... })
}

4. Consensus Pattern

Multiple agents vote on a decision:

// Orchestrator initiates vote
await agent_send({
  type: "broadcast",
  payload: {
    action: "vote_request",
    vote_id: "vote_123",
    question: "Should we migrate to TypeScript strict mode?",
    options: ["yes", "no", "abstain"],
    deadline_ms: 60000
  }
})

// Workers respond with votes
await agent_send({
  type: "vote_response",
  payload: {
    vote_id: "vote_123",
    choice: "yes",
    rationale: "Catches more bugs at compile time"
  }
})

// Orchestrator tallies votes
const collectVotes = async (voteId: string, deadline: number) => {
  await new Promise(r => setTimeout(r, deadline))
  
  const votes = (await agent_messages())
    .filter(m => m.type === "vote_response" && m.payload.vote_id === voteId)
  
  const tally = { yes: 0, no: 0, abstain: 0 }
  votes.forEach(v => tally[v.payload.choice]++)
  
  const winner = Object.entries(tally)
    .sort((a, b) => b[1] - a[1])[0][0]
  
  await agent_send({
    type: "broadcast",
    payload: {
      action: "vote_result",
      vote_id: voteId,
      result: winner,
      tally
    }
  })
}

5. Checkpoint Pattern

Save intermediate progress for resumability:

// Worker periodically saves checkpoints
const saveCheckpoint = async (taskId: string, progress: any) => {
  const checkpointPath = `memory/checkpoints/${taskId}.json`
  
  await task_update({
    task_id: taskId,
    notes: JSON.stringify({
      checkpoint_time: new Date().toISOString(),
      progress_percent: progress.percent,
      checkpoint_path: checkpointPath
    })
  })
  
  // Save detailed checkpoint data
  writeFileSync(checkpointPath, JSON.stringify({
    task_id: taskId,
    timestamp: new Date().toISOString(),
    state: progress.state,
    completed_items: progress.completed,
    remaining_items: progress.remaining
  }, null, 2))
}

// New worker can resume from checkpoint
const resumeFromCheckpoint = async (taskId: string) => {
  const checkpointPath = `memory/checkpoints/${taskId}.json`
  
  if (existsSync(checkpointPath)) {
    const checkpoint = JSON.parse(readFileSync(checkpointPath, 'utf-8'))
    
    // Resume from saved state
    return {
      startFrom: checkpoint.completed_items.length,
      remaining: checkpoint.remaining_items,
      previousState: checkpoint.state
    }
  }
  
  return null // Start fresh
}

6. Pipeline Pattern

Chain agents in a processing pipeline:

// Define pipeline stages
const pipeline = [
  { stage: "analyze", role: "code-analyzer" },
  { stage: "refactor", role: "code-refactorer" },
  { stage: "test", role: "test-runner" },
  { stage: "review", role: "code-reviewer" }
]

// Each stage creates output for next stage
const processPipeline = async (input: any) => {
  let stageInput = input
  
  for (const { stage, role } of pipeline) {
    // Create stage task
    const stageTask = await task_create({
      title: `Pipeline stage: ${stage}`,
      description: `Process: ${JSON.stringify(stageInput).slice(0, 100)}...`,
      tags: [`pipeline:${pipelineId}`, `stage:${stage}`]
    })
    
    // Assign to specialized worker
    await agent_send({
      type: "broadcast",
      payload: {
        action: "stage_ready",
        pipeline_id: pipelineId,
        stage,
        task_id: stageTask.task.id,
        input: stageInput
      }
    })
    
    // Wait for stage completion
    const result = await waitForTaskComplete(stageTask.task.id)
    stageInput = result.output
  }
  
  return stageInput // Final pipeline output
}

PERFORMANCE OPTIMIZATION

1. Batched Operations

  • Group related file reads
  • Batch message processing
  • Consolidate status updates

2. Resource Management

  • Monitor token usage via memory_status()
  • Check active agent count via agent_status()
  • Balance workload across available workers

3. Caching Strategies

  • Use knowledge-base.json for persistent insights
  • Session-level state in working.md
  • Message bus for ephemeral coordination

Anti-Patterns

Message Flooding

BAD:  for (item of items) { agent_send(...) }  // Thousands of messages
GOOD: agent_send({ payload: { items: [...] } })  // Batch in one message

Task Thrashing

BAD:  task_claim -> realize too complex -> abandon -> task_claim another
GOOD: Read task description fully, check complexity BEFORE claiming

Agent Starvation

BAD:  Orchestrator claims and does all work itself
GOOD: Orchestrator delegates, only does coordination tasks

Circular Dependencies

BAD:  Task A depends on B, B depends on A
GOOD: Break cycle, create independent subtasks

Blocking Operations

BAD:  Task tool call (blocks entire session)
GOOD: nohup opencode run '...' > /dev/null 2>&1 &  (non-blocking)

Quick Reference

Worker Lifecycle

1. agent_register(role="<role>")
2. task_claim(task_id) OR receive direct assignment
3. Do work
4. task_update(task_id, status="completed")
5. agent_send(type="task_complete", payload={...})
6. agent can handoff

Orchestrator Loop

1. agent_set_handoff(enabled=false)  // CRITICAL
2. agent_register(role="orchestrator")
3. LOOP:
   - user_messages_read() → handle immediately
   - agent_messages() → process completions/help
   - task_schedule() → plan next work
   - Spawn workers for pending tasks
   - agent_status() → monitor health
   - quality_assess() → review completed work

Remember: Effective coordination is key to a healthy multi-agent system!