Agent Skills: Celery

Distributed task queue for Python supporting Redis, RabbitMQ, periodic scheduling, real-time processing, and Django integration

UncategorizedID: CodeAtCode/oss-ai-skills/celery

Install this agent skill to your local

pnpm dlx add-skill https://github.com/CodeAtCode/oss-ai-skills/tree/HEAD/frameworks/celery

Skill Files

Browse the full folder contents for celery.

Download Skill

Loading file tree…

frameworks/celery/SKILL.md

Skill Metadata

Name
celery
Description
"Distributed task queue for Python supporting Redis, RabbitMQ, periodic scheduling, real-time processing, and Django integration"

Celery

Complete reference for distributed task processing with Celery.

Overview

Celery is a distributed task queue system for Python that enables asynchronous task execution, scheduled tasks, and real-time processing.

Key Features:

  • Distributed: Run workers across multiple machines
  • Brokers: Redis, RabbitMQ, SQS, and more
  • Scheduled tasks: Periodic execution with Celery Beat
  • Real-time: Task monitoring and result tracking
  • Scalable: Easy horizontal scaling

Architecture

Producer → Broker → Worker → Result Backend
   ↓          ↓         ↓          ↓
 Task      Queue   Consumer    Storage

When to Use Celery

  • Email sending
  • Image/video processing
  • Report generation
  • Web scraping
  • Scheduled maintenance tasks
  • Long-running computations
  • Third-party API calls

Installation

pip install celery

# With Redis broker (recommended)
pip install celery[redis]

# With RabbitMQ broker
pip install celery[amqp]

# With Django
pip install django-celery-beat django-celery-results

Basic Configuration

# celery_config.py or celery.py
from celery import Celery

# Create app
app = Celery('myapp')

# Configure from Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')

# Or configure directly
app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/0',
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Django Integration

# myproject/celery.py
import os
from celery import Celery

# Set default Django settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Load config from Django settings with CELERY_ prefix
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-discover tasks in installed apps
app.autodiscover_tasks()

# myproject/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

Project Structure

myproject/
├── celery.py
├── __init__.py
├── settings.py
└── apps/
    └── myapp/
        ├── tasks.py
        └── __init__.py

Brokers

Redis

# Basic Redis configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# Redis with authentication
CELERY_BROKER_URL = 'redis://:password@localhost:6379/0'

# Redis Sentinel for HA
CELERY_BROKER_URL = 'sentinel://localhost:26379/0;sentinel://localhost:26380/0;sentinel://localhost:26381/0'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'master_name': 'mymaster',
}

# Redis with SSL
CELERY_BROKER_URL = 'rediss://localhost:6379/0'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'ssl_cert_reqs': 'required',
}

RabbitMQ

# Basic RabbitMQ configuration
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'

# RabbitMQ with authentication
CELERY_BROKER_URL = 'amqp://user:password@localhost:5672/vhost'

# RabbitMQ with SSL
CELERY_BROKER_URL = 'amqps://user:password@localhost:5671/vhost'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'ssl_cert_reqs': 'required',
}

Amazon SQS

CELERY_BROKER_URL = 'sqs://aws_access_key:aws_secret_key@'

CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region': 'us-east-1',
    'visibility_timeout': 3600,
    'polling_interval': 1,
}

Result Backends

Redis

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# With expiration
CELERY_RESULT_EXPIRES = 3600  # 1 hour

Database (Django)

# Install: pip install django-celery-results
INSTALLED_APPS = [
    # ...
    'django_celery_results',
]

CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

RPC

# Results sent back as AMQP messages
CELERY_RESULT_BACKEND = 'rpc://'

Task Definitions

Basic Task

from celery import shared_task

@shared_task
def add(x, y):
    """Simple addition task."""
    return x + y

@shared_task
def send_email(to, subject, body):
    """Send email task."""
    # Email sending logic
    send_mail(subject, body, 'from@example.com', [to])
    return f"Email sent to {to}"

Task Options

from celery import shared_task

@shared_task(
    name='myapp.process_data',      # Custom task name
    bind=True,                       # Access to task instance
    max_retries=3,                   # Max retry attempts
    default_retry_delay=60,          # Delay between retries (seconds)
    autoretry_for=(Exception,),      # Auto-retry on these exceptions
    retry_backoff=True,              # Exponential backoff
    retry_backoff_max=600,           # Max backoff delay
    retry_jitter=True,               # Add jitter to backoff
    time_limit=300,                  # Hard time limit (seconds)
    soft_time_limit=240,             # Soft time limit
    rate_limit='10/m',               # Rate limiting
    ignore_result=False,             # Store result
    store_errors_even_if_ignored=True,
)
def process_data(self, data_id):
    """Process data with full task options."""
    try:
        data = Data.objects.get(id=data_id)
        result = data.process()
        return result
    except Data.DoesNotExist:
        raise self.retry(countdown=60)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

Bound Tasks

@shared_task(bind=True)
def bound_task(self, x, y):
    """Task with access to task instance."""
    # Access task ID
    task_id = self.request.id
    
    # Access retries count
    retries = self.request.retries
    
    # Update task state
    self.update_state(
        state='PROGRESS',
        meta={'current': 50, 'total': 100}
    )
    
    # Retry manually
    try:
        return risky_operation(x, y)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)

Task Inheritance

from celery import Task

class DatabaseTask(Task):
    """Custom task base class with database session."""
    _db = None
    
    @property
    def db(self):
        if self._db is None:
            self._db = DatabaseSession()
        return self._db
    
    def after_return(self, *args, **kwargs):
        if self._db is not None:
            self._db.close()
            self._db = None

@shared_task(base=DatabaseTask, bind=True)
def database_task(self, record_id):
    """Task using custom base class."""
    record = self.db.query(Record).get(record_id)
    return record.process()

Task Signals

from celery import signals

@signals.task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
    """Called before task execution."""
    print(f"Task {task.name}[{task_id}] starting...")

@signals.task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **kwargs):
    """Called after task execution."""
    print(f"Task {task.name}[{task_id}] completed with result: {retval}")

@signals.task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kwargs):
    """Called on task failure."""
    print(f"Task {task_id} failed: {exception}")

@signals.task_retry.connect
def task_retry_handler(sender=None, reason=None, **kwargs):
    """Called on task retry."""
    print(f"Task retrying: {reason}")

Calling Tasks

Basic Calls

# Apply async (recommended)
result = add.apply_async((2, 3))

# Using delay (shortcut for apply_async)
result = add.delay(2, 3)

# Get result
value = result.get()  # Blocks until ready
value = result.get(timeout=10)  # With timeout

# Check status
result.ready()  # True if completed
result.successful()  # True if successful
result.failed()  # True if failed
result.state  # PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED

apply_async Options

# Countdown (delay in seconds)
result = add.apply_async((2, 3), countdown=60)

# ETA (specific time)
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(hours=1)
result = add.apply_async((2, 3), eta=eta)

# Expires
result = add.apply_async((2, 3), expires=3600)  # 1 hour

# Specific queue
result = add.apply_async((2, 3), queue='high_priority')

# Specific worker
result = add.apply_async((2, 3), worker='worker1@hostname')

# Retry policy
result = add.apply_async(
    (2, 3),
    retry=True,
    retry_policy={
        'max_retries': 3,
        'interval_start': 0,
        'interval_step': 0.2,
        'interval_max': 0.5,
    }
)

# Priority (0-255, higher = more important)
result = add.apply_async((2, 3), priority=10)

# Custom task ID
result = add.apply_async((2, 3), task_id='custom-task-id')

# Compression
result = add.apply_async((large_data,), compression='gzip')

Signatures

from celery import signature

# Create signature
s = add.s(2, 3)
s.apply_async()

# Partial signature (immutable)
s = add.s(2)  # First argument fixed
s.apply_async(args=(3,))  # Will call add(2, 3)

# Signature with options
s = add.s(2, 3).set(countdown=10).set(queue='priority')
s.apply_async()

# Clone signature
s2 = s.clone(args=(4, 5))

Chains

from celery import chain

# Sequential execution
workflow = chain(add.s(2, 3), add.s(4), add.s(5))
result = workflow.apply_async()
# add(2, 3) → add(result, 4) → add(result, 5)

# Pipe notation
workflow = add.s(2, 3) | add.s(4) | add.s(5)
result = workflow.apply_async()

# With error handling
workflow = chain(
    validate_data.s(data_id),
    process_data.s(),
    save_result.s()
)
result = workflow.apply_async()

Groups

from celery import group

# Parallel execution
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()

# Get all results
values = result.get()  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# Check completion
result.ready()  # True when all complete
result.completed_count()  # Number completed

# Group with callback
job = group(process_item.s(item_id) for item_id in items)
callback = summarize_results.s()
workflow = job | callback
result = workflow.apply_async()

Chords

from celery import chord

# Group with callback (parallel + callback)
callback = summarize.s()
header = [process_item.s(item_id) for item_id in items]
result = chord(header)(callback)

# All header tasks run in parallel, then callback receives results
@shared_task
def summarize(results):
    return sum(results)

# Example: Process order
workflow = chord(
    group(
        validate_inventory.s(item_id) 
        for item_id in order_items
    ),
    create_shipment.s(order_id)
)
result = workflow.apply_async()

Chunks

# Split large task into chunks
@shared_task
def process_chunk(items):
    return [process_item(item) for item in items]

# Process 100 items in chunks of 10
items = list(range(100))
result = process_chunk.chunks(items, 10).apply_async()

Task States

Built-in States

from celery.result import AsyncResult

result = add.delay(2, 3)

# States
result.state
# PENDING    - Task waiting to execute
# STARTED    - Task started
# SUCCESS    - Task completed successfully
# FAILURE    - Task failed
# RETRY      - Task being retried
# REVOKED    - Task revoked

# Check state
if result.successful():
    print(f"Result: {result.result}")
elif result.failed():
    print(f"Error: {result.result}")

Custom States

@shared_task(bind=True)
def long_task(self, total):
    """Task with progress tracking."""
    for i in range(total):
        # Update progress
        self.update_state(
            state='PROGRESS',
            meta={
                'current': i + 1,
                'total': total,
                'status': 'Processing...'
            }
        )
        time.sleep(1)
    
    return {'result': 'completed'}

# Check custom state
result = long_task.delay(10)
if result.state == 'PROGRESS':
    progress = result.info
    print(f"{progress['current']}/{progress['total']}")

Periodic Tasks (Celery Beat)

Configuration

# celery.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Every 30 seconds
    'add-every-30-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
    
    # Crontab schedule
    'cleanup-every-night': {
        'task': 'myapp.tasks.cleanup',
        'schedule': crontab(hour=2, minute=0),
    },
    
    # Every Monday morning
    'weekly-report': {
        'task': 'myapp.tasks.weekly_report',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
    },
    
    # Every 5 minutes during business hours
    'business-check': {
        'task': 'myapp.tasks.check_status',
        'schedule': crontab(minute='*/5', hour='9-17'),
    },
}

app.conf.timezone = 'UTC'

Crontab Syntax

from celery.schedules import crontab

# Every minute
crontab()

# Every hour at minute 0
crontab(minute=0)

# Every day at midnight
crontab(hour=0, minute=0)

# Every Monday at 8:30 AM
crontab(hour=8, minute=30, day_of_week=1)

# Every 15 minutes
crontab(minute='*/15')

# First day of every month
crontab(hour=0, minute=0, day_of_month=1)

# Every weekday (Mon-Fri) at 6 PM
crontab(hour=18, minute=0, day_of_week='1-5')

# Specific months
crontab(hour=0, minute=0, day_of_month=1, month_of_year='1,4,7,10')

django-celery-beat

# Install: pip install django-celery-beat

# settings.py
INSTALLED_APPS = [
    # ...
    'django_celery_beat',
]

# Use database-backed schedule
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# Now manage periodic tasks via Django admin
# Create periodic task programmatically
from django_celery_beat.models import PeriodicTask, IntervalSchedule

# Create interval
schedule, _ = IntervalSchedule.objects.get_or_create(
    every=10,
    period=IntervalSchedule.SECONDS,
)

# Create task
PeriodicTask.objects.create(
    interval=schedule,
    name='my-periodic-task',
    task='myapp.tasks.my_task',
    args=json.dumps(['arg1', 'arg2']),
    kwargs=json.dumps({'key': 'value'}),
)

# Crontab schedule
from django_celery_beat.models import CrontabSchedule

schedule, _ = CrontabSchedule.objects.get_or_create(
    minute='0',
    hour='*',
    day_of_week='*',
    day_of_month='*',
    month_of_year='*',
)

PeriodicTask.objects.create(
    crontab=schedule,
    name='hourly-task',
    task='myapp.tasks.hourly',
)

Task Routing

Queue Routing

# settings.py
CELERY_TASK_ROUTES = {
    'myapp.tasks.send_email': {
        'queue': 'email',
    },
    'myapp.tasks.process_video': {
        'queue': 'video',
        'routing_key': 'video.process',
    },
    'myapp.tasks.*': {
        'queue': 'default',
    },
}

# Or use task decorator
@shared_task(queue='email')
def send_email(to, subject, body):
    pass

# Or specify when calling
send_email.apply_async(args=[], queue='priority')

Automatic Routing

# celery.py
def route_task(name, args, kwargs, options, task=None, **kw):
    """Custom routing function."""
    if name.startswith('myapp.email.'):
        return {'queue': 'email'}
    if name.startswith('myapp.video.'):
        return {'queue': 'video', 'routing_key': 'video.high'}
    return {'queue': 'default'}

app.conf.task_routes = (route_task,)

Worker Queues

# Start worker for specific queues
celery -A myproject worker -Q email,video -n worker1@%h

# Multiple workers for different queues
celery -A myproject worker -Q email -n email_worker@%h --concurrency=4
celery -A myproject worker -Q video -n video_worker@%h --concurrency=2
celery -A myproject worker -Q default -n default_worker@%h

Error Handling

Retry Strategies

@shared_task(
    bind=True,
    max_retries=5,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def fetch_external_api(self, url):
    """Fetch from external API with retry."""
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as exc:
        raise self.retry(exc=exc)

# Manual retry with custom logic
@shared_task(bind=True, max_retries=3)
def process_with_retry(self, data_id):
    try:
        data = Data.objects.get(id=data_id)
        return data.process()
    except Data.DoesNotExist:
        # Don't retry - data doesn't exist
        raise
    except DatabaseError as exc:
        # Retry with exponential backoff
        countdown = 2 ** self.request.retries
        raise self.retry(exc=exc, countdown=countdown)

Error Callbacks

@shared_task
def on_failure(request, exc, traceback):
    """Error callback task."""
    logger.error(f"Task {request.id} failed: {exc}")
    notify_admins(f"Task failed: {request.id}")

@shared_task
def process_data(data_id):
    data = Data.objects.get(id=data_id)
    return data.process()

# Link error callback
result = process_data.apply_async(
    args=[data_id],
    link_error=on_failure.s()
)

Dead Letter Queue

# Configure dead letter queue
CELERY_TASK_ANNOTATIONS = {
    '*': {
        'on_failure': handle_task_failure,
    }
}

@shared_task
def handle_task_failure(task_id, exception, args, kwargs, traceback, einfo):
    """Handle failed tasks."""
    FailedTask.objects.create(
        task_id=task_id,
        exception=str(exception),
        args=args,
        kwargs=kwargs,
        traceback=traceback,
    )
    notify_admins(f"Task {task_id} failed permanently")

Monitoring

Flower (Web UI)

# Install
pip install flower

# Start
celery -A myproject flower --port=5555

# With authentication
celery -A myproject flower --port=5555 --basic-auth=user:password
# Configure in settings
CELERY_FLOWER_PORT = 5555
CELERY_FLOWER_BASIC_AUTH = ['user:password']

Command Line Monitoring

# Inspect active tasks
celery -A myproject inspect active

# Inspect registered tasks
celery -A myproject inspect registered

# Inspect scheduled tasks
celery -A myproject inspect scheduled

# Inspect reserved tasks
celery -A myproject inspect reserved

# Stats
celery -A myproject inspect stats

# Ping workers
celery -A myproject inspect ping

# Control workers
celery -A myproject control enable_events
celery -A myproject control disable_events

# Revoke task
celery -A myproject revoke <task_id>

# Terminate task (SIGTERM)
celery -A myproject revoke <task_id> --terminate

# Purge all tasks
celery -A myproject purge

Prometheus Metrics

# Install: pip install celery-prometheus-exporter

# Start exporter
celery-prometheus-exporter --broker redis://localhost:6379/0

# Or integrate into Django
# urls.py
from django.urls import path
from celery_prometheus.views import metrics_view

urlpatterns = [
    path('metrics/', metrics_view),
]

Testing

pytest Configuration

# conftest.py
import pytest
from celery import Celery

@pytest.fixture(scope='session')
def celery_app():
    """Create test Celery app."""
    app = Celery('test_app')
    app.config_from_object({
        'broker_url': 'memory://',
        'result_backend': 'cache+memory://',
        'task_always_eager': True,  # Execute synchronously
        'task_eager_propagates': True,  # Propagate exceptions
    })
    return app

@pytest.fixture
def celery_worker(celery_app):
    """Create test worker."""
    from celery.contrib.testing import worker
    with worker.start_worker(celery_app):
        yield celery_app

Unit Tests

# tests/test_tasks.py
import pytest
from myapp.tasks import add, send_email

# Synchronous testing (eager mode)
@pytest.mark.celery(task_always_eager=True)
def test_add_task(celery_app):
    """Test add task executes synchronously."""
    result = add.delay(2, 3)
    assert result.get() == 5

# Mock task
def test_send_email_task(mocker):
    """Test send_email task with mocked send."""
    mock_send = mocker.patch('myapp.tasks.send_mail')
    
    send_email.delay('to@example.com', 'Subject', 'Body')
    
    mock_send.assert_called_once_with(
        'Subject', 'Body', 'from@example.com', ['to@example.com']
    )

# Test with fixtures
@pytest.fixture
def mock_email_backend():
    """Mock email backend."""
    with patch('myapp.tasks.send_mail') as mock:
        yield mock

def test_email_task_with_fixture(mock_email_backend):
    send_email.delay('test@example.com', 'Test', 'Body')
    assert mock_email_backend.called

Integration Tests

# tests/test_integration.py
import pytest
from celery.result import AsyncResult

@pytest.mark.integration
def test_task_execution(celery_worker):
    """Test real task execution with worker."""
    result = add.apply_async(args=(10, 20))
    
    # Wait for result
    value = result.get(timeout=10)
    
    assert value == 30
    assert result.successful()

@pytest.mark.integration
def test_task_retry(celery_worker):
    """Test task retry behavior."""
    from myapp.tasks import flaky_task
    
    result = flaky_task.apply_async()
    
    # Should eventually succeed after retries
    value = result.get(timeout=30)
    assert value is not None

Performance

Concurrency

# Worker configuration
CELERY_WORKER_CONCURRENCY = 4  # Number of worker processes

# Prefetch limit
CELERY_WORKER_PREFETCH_MULTIPLIER = 4  # Tasks per worker

# For long tasks, reduce prefetch
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# Start with specific concurrency
celery -A myproject worker --concurrency=4

# Use eventlet for I/O-bound tasks
celery -A myproject worker --pool=eventlet --concurrency=100

# Use gevent
celery -A myproject worker --pool=gevent --concurrency=100

Task Optimization

# Avoid database queries in loops
@shared_task
def process_items_bad(item_ids):
    """Bad: N database queries."""
    results = []
    for item_id in item_ids:
        item = Item.objects.get(id=item_id)  # N queries!
        results.append(item.process())
    return results

@shared_task
def process_items_good(item_ids):
    """Good: 1 database query."""
    items = Item.objects.filter(id__in=item_ids)
    return [item.process() for item in items]

# Use bulk operations
@shared_task
def bulk_update_items(updates):
    """Use bulk_update for efficiency."""
    items = []
    for item_id, data in updates:
        item = Item(id=item_id, **data)
        items.append(item)
    
    Item.objects.bulk_update(
        items,
        ['field1', 'field2', 'field3']
    )

Memory Management

# Process large datasets in chunks
@shared_task
def process_large_dataset(dataset_id):
    """Process large dataset without loading all into memory."""
    dataset = Dataset.objects.get(id=dataset_id)
    
    # Use iterator to avoid loading all records
    for batch in dataset.records.iterator(chunk_size=1000):
        process_batch(batch)

# Configure max tasks per worker
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000  # Restart after 1000 tasks

# Memory limit
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 400000  # 400MB

Best Practices

1. Idempotent Tasks

@shared_task(bind=True)
def process_payment(self, payment_id):
    """Idempotent payment processing."""
    payment = Payment.objects.select_for_update().get(id=payment_id)
    
    # Check if already processed
    if payment.status == 'completed':
        return {'status': 'already_processed'}
    
    # Process only once
    with transaction.atomic():
        result = payment.charge()
        payment.mark_completed()
    
    return result

2. Proper Error Handling

@shared_task(bind=True, autoretry_for=(Exception,), max_retries=3)
def robust_task(self, data_id):
    """Task with proper error handling."""
    try:
        data = Data.objects.get(id=data_id)
    except Data.DoesNotExist:
        # Log and don't retry
        logger.error(f"Data {data_id} not found")
        return None
    
    try:
        return data.process()
    except ExternalAPIError as exc:
        # Retry with backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    except Exception as exc:
        # Unexpected error - log and retry
        logger.exception(f"Unexpected error processing {data_id}")
        raise

3. Task Granularity

# Bad: One monolithic task
@shared_task
def process_order_bad(order_id):
    order = Order.objects.get(id=order_id)
    order.validate()
    order.charge()
    order.ship()
    order.send_confirmation()

# Good: Smaller, focused tasks
@shared_task
def validate_order(order_id):
    order = Order.objects.get(id=order_id)
    order.validate()
    charge_order.delay(order_id)

@shared_task
def charge_order(order_id):
    order = Order.objects.get(id=order_id)
    order.charge()
    ship_order.delay(order_id)

@shared_task
def ship_order(order_id):
    order = Order.objects.get(id=order_id)
    order.ship()
    send_confirmation.delay(order_id)

4. Use Task Queues Appropriately

# Route different task types to different queues
@shared_task(queue='high_priority')
def send_password_reset(user_id):
    """Time-sensitive task."""
    pass

@shared_task(queue='low_priority')
def generate_report(user_id):
    """Background task."""
    pass

@shared_task(queue='compute')
def process_video(video_id):
    """CPU-intensive task."""
    pass

Common Issues

Issue: Tasks Stuck in PENDING

Problem: Tasks never execute.

Solution:

# Check worker is running
celery -A myproject inspect active

# Check worker is consuming from correct queue
celery -A myproject worker -Q myqueue

# Check broker connection
celery -A myproject inspect ping

Issue: Memory Leaks

Problem: Worker memory grows over time.

Solution:

# Limit tasks per worker
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000

# Or limit memory
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 400000  # KB

# Avoid circular references in tasks
@shared_task
def task_with_cleanup():
    try:
        return process()
    finally:
        # Clean up resources
        cleanup()

Issue: Duplicate Task Execution

Problem: Task executed multiple times.

Solution:

# Use idempotent tasks
@shared_task(bind=True)
def idempotent_task(self, data_id):
    with transaction.atomic():
        # Lock record
        data = Data.objects.select_for_update().get(id=data_id)
        
        if data.processed:
            return {'status': 'already_processed'}
        
        result = data.process()
        data.processed = True
        data.save()
    
    return result

# Or use task deduplication
CELERY_TASK_ANNOTATIONS = {
    'myapp.tasks.*': {
        'acks_late': True,
        'reject_on_worker_lost': True,
    }
}

References

  • Official Documentation: https://docs.celeryq.dev/
  • GitHub Repository: https://github.com/celery/celery
  • Flower Monitoring: https://github.com/mher/flower
  • Django Celery: https://github.com/celery/django-celery