Agent Skills: Concurrency

Comprehensive concurrency and parallelism patterns for multi-threaded and async programming. Use when implementing async/await, parallel processing, thread safety, worker pools, or debugging race conditions and deadlocks. Triggers: async, await, concurrent, parallel, threads, race condition, deadlock, mutex, semaphore, worker pool, queue.

UncategorizedID: cosmix/claude-loom/concurrency

Skill Files

Browse the full folder contents for concurrency.

Download Skill

Loading file tree…

skills/concurrency/SKILL.md

Skill Metadata

Name
concurrency
Description
Comprehensive concurrency and parallelism patterns for multi-threaded and async programming. Use when implementing async/await, parallel processing, thread safety, worker pools, or debugging race conditions and deadlocks. Triggers: async, await, concurrent, parallel, threads, race condition, deadlock, mutex, semaphore, worker pool, queue.

Concurrency

Overview

Concurrency enables programs to handle multiple tasks efficiently. This skill covers async/await patterns, parallelism vs concurrency distinctions, race condition prevention, deadlock handling, thread safety patterns, and work queue implementations.

Instructions

1. Async/Await Patterns

Python Async Patterns

import asyncio
from typing import List, TypeVar, Coroutine, Any

T = TypeVar('T')

# Basic async function
async def fetch_data(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Concurrent execution with gather
async def fetch_all(urls: List[str]) -> List[dict]:
    tasks = [fetch_data(url) for url in urls]
    return await asyncio.gather(*tasks, return_exceptions=True)

# Timeout handling
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> dict:
    try:
        return await asyncio.wait_for(fetch_data(url), timeout=timeout)
    except asyncio.TimeoutError:
        raise TimeoutError(f"Request to {url} timed out after {timeout}s")

# Semaphore for rate limiting
async def fetch_with_rate_limit(urls: List[str], max_concurrent: int = 10) -> List[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_fetch(url: str) -> dict:
        async with semaphore:
            return await fetch_data(url)

    return await asyncio.gather(*[limited_fetch(url) for url in urls])

# Async context manager
class AsyncDatabaseConnection:
    async def __aenter__(self):
        self.conn = await asyncpg.connect(...)
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()

# Async iterator
class AsyncPaginator:
    def __init__(self, fetch_page):
        self.fetch_page = fetch_page
        self.page = 0
        self.done = False

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.done:
            raise StopAsyncIteration

        result = await self.fetch_page(self.page)
        if not result:
            self.done = True
            raise StopAsyncIteration

        self.page += 1
        return result

TypeScript Async Patterns

// Promise.all for concurrent execution
async function fetchAll<T>(urls: string[]): Promise<T[]> {
  return Promise.all(urls.map((url) => fetch(url).then((r) => r.json())));
}

// Promise.allSettled for fault tolerance
async function fetchAllSafe<T>(urls: string[]): Promise<Array<T | Error>> {
  const results = await Promise.allSettled(
    urls.map((url) => fetch(url).then((r) => r.json())),
  );

  return results.map((result) =>
    result.status === "fulfilled" ? result.value : new Error(result.reason),
  );
}

// Rate-limited concurrent execution
async function fetchWithConcurrencyLimit<T>(
  items: string[],
  fn: (item: string) => Promise<T>,
  limit: number,
): Promise<T[]> {
  const results: T[] = [];
  const executing: Promise<void>[] = [];

  for (const item of items) {
    const p = fn(item).then((result) => {
      results.push(result);
    });
    executing.push(p);

    if (executing.length >= limit) {
      await Promise.race(executing);
      executing.splice(
        executing.findIndex((e) => e === p),
        1,
      );
    }
  }

  await Promise.all(executing);
  return results;
}

// Async queue
class AsyncQueue<T> {
  private queue: T[] = [];
  private resolvers: Array<(value: T) => void> = [];

  async enqueue(item: T): Promise<void> {
    if (this.resolvers.length > 0) {
      const resolve = this.resolvers.shift()!;
      resolve(item);
    } else {
      this.queue.push(item);
    }
  }

  async dequeue(): Promise<T> {
    if (this.queue.length > 0) {
      return this.queue.shift()!;
    }
    return new Promise((resolve) => this.resolvers.push(resolve));
  }
}

2. Parallelism vs Concurrency

import asyncio
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Concurrency: I/O-bound tasks (use async or threads)
async def io_bound_concurrent():
    """Use for network calls, file I/O, database queries."""
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        return await asyncio.gather(*tasks)

def io_bound_threaded(urls: List[str]):
    """Threads for I/O when async isn't available."""
    with ThreadPoolExecutor(max_workers=10) as executor:
        return list(executor.map(requests.get, urls))

# Parallelism: CPU-bound tasks (use processes)
def cpu_bound_parallel(data: List[int]) -> List[int]:
    """Use for heavy computation - bypasses GIL."""
    with ProcessPoolExecutor() as executor:
        return list(executor.map(heavy_computation, data))

# Hybrid: CPU work with I/O
async def hybrid_processing(items: List[dict]):
    """Combine async I/O with parallel CPU processing."""
    loop = asyncio.get_event_loop()

    # Fetch data concurrently
    raw_data = await asyncio.gather(*[fetch(item) for item in items])

    # Process CPU-bound work in parallel
    with ProcessPoolExecutor() as executor:
        processed = await loop.run_in_executor(
            executor,
            process_batch,
            raw_data
        )

    return processed

3. Race Conditions and Prevention

import threading
import asyncio
from contextlib import contextmanager

# Thread-safe counter with lock
class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self._value += 1
            return self._value

    @property
    def value(self):
        with self._lock:
            return self._value

# Read-write lock for optimized concurrent access
class ReadWriteLock:
    def __init__(self):
        self._read_ready = threading.Condition(threading.Lock())
        self._readers = 0

    @contextmanager
    def read_lock(self):
        with self._read_ready:
            self._readers += 1
        try:
            yield
        finally:
            with self._read_ready:
                self._readers -= 1
                if self._readers == 0:
                    self._read_ready.notify_all()

    @contextmanager
    def write_lock(self):
        with self._read_ready:
            while self._readers > 0:
                self._read_ready.wait()
            yield

# Async lock for async code
class AsyncSafeCache:
    def __init__(self):
        self._cache = {}
        self._lock = asyncio.Lock()

    async def get_or_set(self, key: str, factory):
        async with self._lock:
            if key not in self._cache:
                self._cache[key] = await factory()
            return self._cache[key]

# Compare-and-swap for lock-free operations
import atomics  # or use threading primitives

class LockFreeCounter:
    def __init__(self):
        self._value = atomics.atomic(width=8, atype=atomics.INT)

    def increment(self):
        while True:
            current = self._value.load()
            if self._value.cmpxchg_weak(current, current + 1):
                return current + 1

4. Deadlock Detection and Prevention

import threading
from collections import defaultdict
from typing import Dict, Set
import time

# Deadlock prevention with lock ordering
class OrderedLockManager:
    """Prevents deadlocks by enforcing lock acquisition order."""

    def __init__(self):
        self._lock_order: Dict[str, int] = {}
        self._next_order = 0
        self._thread_locks: Dict[int, Set[str]] = defaultdict(set)
        self._meta_lock = threading.Lock()

    def register_lock(self, name: str) -> threading.Lock:
        with self._meta_lock:
            if name not in self._lock_order:
                self._lock_order[name] = self._next_order
                self._next_order += 1
        return threading.Lock()

    @contextmanager
    def acquire(self, lock: threading.Lock, name: str):
        thread_id = threading.current_thread().ident

        # Check lock ordering
        held_locks = self._thread_locks[thread_id]
        for held_name in held_locks:
            if self._lock_order[name] < self._lock_order[held_name]:
                raise RuntimeError(
                    f"Lock ordering violation: {name} < {held_name}"
                )

        lock.acquire()
        self._thread_locks[thread_id].add(name)
        try:
            yield
        finally:
            self._thread_locks[thread_id].discard(name)
            lock.release()

# Timeout-based deadlock detection
class TimeoutLock:
    def __init__(self, timeout: float = 5.0):
        self._lock = threading.Lock()
        self._timeout = timeout

    def acquire(self):
        acquired = self._lock.acquire(timeout=self._timeout)
        if not acquired:
            raise DeadlockError(
                f"Failed to acquire lock within {self._timeout}s - possible deadlock"
            )
        return True

    def release(self):
        self._lock.release()

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, *args):
        self.release()

# Deadlock detection with wait-for graph
class DeadlockDetector:
    def __init__(self):
        self._wait_for: Dict[int, int] = {}  # thread -> thread it's waiting for
        self._lock = threading.Lock()

    def register_wait(self, waiting_thread: int, holding_thread: int):
        with self._lock:
            self._wait_for[waiting_thread] = holding_thread
            if self._has_cycle():
                raise DeadlockError("Deadlock detected in wait-for graph")

    def unregister_wait(self, thread: int):
        with self._lock:
            self._wait_for.pop(thread, None)

    def _has_cycle(self) -> bool:
        visited = set()
        rec_stack = set()

        def dfs(node):
            visited.add(node)
            rec_stack.add(node)

            next_node = self._wait_for.get(node)
            if next_node:
                if next_node not in visited:
                    if dfs(next_node):
                        return True
                elif next_node in rec_stack:
                    return True

            rec_stack.remove(node)
            return False

        for node in self._wait_for:
            if node not in visited:
                if dfs(node):
                    return True
        return False

5. Thread Safety Patterns

import threading
from functools import wraps
from typing import TypeVar, Generic

T = TypeVar('T')

# Thread-local storage
class RequestContext:
    _local = threading.local()

    @classmethod
    def set_user(cls, user_id: str):
        cls._local.user_id = user_id

    @classmethod
    def get_user(cls) -> str:
        return getattr(cls._local, 'user_id', None)

# Immutable data for thread safety
from dataclasses import dataclass
from typing import Tuple

@dataclass(frozen=True)
class ImmutableConfig:
    host: str
    port: int
    options: Tuple[str, ...]  # Use tuple instead of list

# Copy-on-write pattern
class CopyOnWriteList(Generic[T]):
    def __init__(self):
        self._data: Tuple[T, ...] = ()
        self._lock = threading.Lock()

    def append(self, item: T):
        with self._lock:
            self._data = (*self._data, item)

    def __iter__(self):
        # Snapshot iteration - safe without lock
        return iter(self._data)

# Thread-safe singleton
class Singleton:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance

# Synchronized decorator
def synchronized(lock: threading.Lock = None):
    def decorator(func):
        nonlocal lock
        if lock is None:
            lock = threading.Lock()

        @wraps(func)
        def wrapper(*args, **kwargs):
            with lock:
                return func(*args, **kwargs)
        return wrapper
    return decorator

6. Work Queues and Worker Pools

import asyncio
import queue
import threading
from typing import Callable, Any, List
from dataclasses import dataclass
from concurrent.futures import Future

@dataclass
class Job:
    func: Callable
    args: tuple
    kwargs: dict
    future: Future

# Thread-based worker pool
class ThreadWorkerPool:
    def __init__(self, num_workers: int = 4):
        self._queue = queue.Queue()
        self._workers: List[threading.Thread] = []
        self._shutdown = False

        for _ in range(num_workers):
            worker = threading.Thread(target=self._worker_loop, daemon=True)
            worker.start()
            self._workers.append(worker)

    def _worker_loop(self):
        while not self._shutdown:
            try:
                job = self._queue.get(timeout=1)
                try:
                    result = job.func(*job.args, **job.kwargs)
                    job.future.set_result(result)
                except Exception as e:
                    job.future.set_exception(e)
            except queue.Empty:
                continue

    def submit(self, func: Callable, *args, **kwargs) -> Future:
        future = Future()
        job = Job(func, args, kwargs, future)
        self._queue.put(job)
        return future

    def shutdown(self, wait: bool = True):
        self._shutdown = True
        if wait:
            for worker in self._workers:
                worker.join()

# Async worker pool
class AsyncWorkerPool:
    def __init__(self, num_workers: int = 10):
        self._queue: asyncio.Queue = asyncio.Queue()
        self._num_workers = num_workers
        self._workers: List[asyncio.Task] = []

    async def start(self):
        for _ in range(self._num_workers):
            task = asyncio.create_task(self._worker_loop())
            self._workers.append(task)

    async def _worker_loop(self):
        while True:
            job = await self._queue.get()
            if job is None:  # Shutdown signal
                break

            func, args, kwargs, future = job
            try:
                if asyncio.iscoroutinefunction(func):
                    result = await func(*args, **kwargs)
                else:
                    result = func(*args, **kwargs)
                future.set_result(result)
            except Exception as e:
                future.set_exception(e)
            finally:
                self._queue.task_done()

    async def submit(self, func: Callable, *args, **kwargs) -> Any:
        future = asyncio.Future()
        await self._queue.put((func, args, kwargs, future))
        return await future

    async def shutdown(self):
        for _ in self._workers:
            await self._queue.put(None)
        await asyncio.gather(*self._workers)

# Priority queue worker
class PriorityWorkerPool:
    def __init__(self, num_workers: int = 4):
        self._queue = queue.PriorityQueue()
        self._workers: List[threading.Thread] = []
        self._shutdown = False

        for _ in range(num_workers):
            worker = threading.Thread(target=self._worker_loop, daemon=True)
            worker.start()
            self._workers.append(worker)

    def _worker_loop(self):
        while not self._shutdown:
            try:
                priority, job = self._queue.get(timeout=1)
                try:
                    result = job.func(*job.args, **job.kwargs)
                    job.future.set_result(result)
                except Exception as e:
                    job.future.set_exception(e)
            except queue.Empty:
                continue

    def submit(self, func: Callable, *args, priority: int = 0, **kwargs) -> Future:
        future = Future()
        job = Job(func, args, kwargs, future)
        self._queue.put((priority, job))
        return future

Best Practices

  1. Prefer Async for I/O: Use async/await for network and file I/O operations.

  2. Use Processes for CPU Work: Bypass GIL with ProcessPoolExecutor for CPU-bound tasks.

  3. Minimize Shared State: Prefer message passing over shared memory.

  4. Lock Ordering: Always acquire locks in a consistent order to prevent deadlocks.

  5. Keep Critical Sections Small: Hold locks for the minimum time necessary.

  6. Use Higher-Level Abstractions: Prefer queues, futures, and async patterns over raw locks.

  7. Test for Race Conditions: Use tools like ThreadSanitizer and stress testing.

  8. Document Thread Safety: Clearly document which methods are thread-safe.

Examples

Complete Async Web Scraper with Rate Limiting

import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class ScrapeResult:
    url: str
    status: int
    content: Optional[str]
    error: Optional[str] = None

class AsyncScraper:
    def __init__(
        self,
        max_concurrent: int = 10,
        requests_per_second: float = 5.0
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limit = 1.0 / requests_per_second
        self.last_request_time = 0
        self._lock = asyncio.Lock()

    async def _rate_limit(self):
        async with self._lock:
            now = asyncio.get_event_loop().time()
            wait_time = self.last_request_time + self.rate_limit - now
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            self.last_request_time = asyncio.get_event_loop().time()

    async def scrape_url(
        self,
        session: aiohttp.ClientSession,
        url: str
    ) -> ScrapeResult:
        async with self.semaphore:
            await self._rate_limit()
            try:
                async with session.get(url, timeout=10) as response:
                    content = await response.text()
                    return ScrapeResult(
                        url=url,
                        status=response.status,
                        content=content
                    )
            except Exception as e:
                return ScrapeResult(
                    url=url,
                    status=0,
                    content=None,
                    error=str(e)
                )

    async def scrape_all(self, urls: List[str]) -> List[ScrapeResult]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.scrape_url(session, url) for url in urls]
            return await asyncio.gather(*tasks)

# Usage
async def main():
    scraper = AsyncScraper(max_concurrent=5, requests_per_second=2.0)
    urls = ["https://example.com"] * 20
    results = await scraper.scrape_all(urls)

    for result in results:
        if result.error:
            print(f"Failed: {result.url} - {result.error}")
        else:
            print(f"Success: {result.url} - {result.status}")

asyncio.run(main())