Agent Skills: NoSQL Databases

MongoDB, Redis, Cassandra, DynamoDB, and distributed database patterns for scalable applications

nosqlmongodbrediscassandradynamodb
databaseID: pluginagentmarketplace/custom-plugin-data-engineer/nosql-databases

Skill Files

Browse the full folder contents for nosql-databases.

Download Skill

Loading file tree…

skills/nosql-databases/SKILL.md

Skill Metadata

Name
nosql-databases
Description
MongoDB, Redis, Cassandra, DynamoDB, and distributed database patterns for scalable applications

NoSQL Databases

Production-grade NoSQL database patterns with MongoDB, Redis, Cassandra, and DynamoDB.

Quick Start

# MongoDB with PyMongo
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from datetime import datetime

client = MongoClient("mongodb://localhost:27017/")
db = client.analytics
events = db.events

# Create index for query performance
events.create_index([("user_id", 1), ("timestamp", -1)])
events.create_index([("event_type", 1)])

# Insert with retry pattern
def insert_event(event: dict, retries: int = 3):
    event["_id"] = f"{event['user_id']}_{event['timestamp'].isoformat()}"
    event["created_at"] = datetime.utcnow()

    for attempt in range(retries):
        try:
            events.insert_one(event)
            return True
        except DuplicateKeyError:
            return False  # Already exists
        except Exception as e:
            if attempt == retries - 1:
                raise
    return False

# Aggregation pipeline
pipeline = [
    {"$match": {"event_type": "purchase", "timestamp": {"$gte": datetime(2024, 1, 1)}}},
    {"$group": {"_id": "$user_id", "total_purchases": {"$sum": "$amount"}, "count": {"$sum": 1}}},
    {"$sort": {"total_purchases": -1}},
    {"$limit": 100}
]
top_customers = list(events.aggregate(pipeline))

Core Concepts

1. Redis for Caching & Real-time

import redis
import json
from datetime import timedelta

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Cache pattern with TTL
def get_user_profile(user_id: str) -> dict:
    cache_key = f"user:{user_id}:profile"

    # Try cache first
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache miss - fetch from DB
    profile = fetch_from_database(user_id)

    # Set with 1 hour TTL
    r.setex(cache_key, timedelta(hours=1), json.dumps(profile))
    return profile

# Rate limiting
def check_rate_limit(user_id: str, limit: int = 100, window: int = 60) -> bool:
    key = f"rate:{user_id}:{int(time.time()) // window}"
    current = r.incr(key)

    if current == 1:
        r.expire(key, window)

    return current <= limit

# Real-time leaderboard with sorted sets
def update_leaderboard(user_id: str, score: float):
    r.zadd("leaderboard:daily", {user_id: score})

def get_top_users(n: int = 10) -> list:
    return r.zrevrange("leaderboard:daily", 0, n-1, withscores=True)

# Pub/Sub for event streaming
def publish_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

def subscribe_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield json.loads(message['data'])

2. DynamoDB Patterns

import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Events')

# Single table design pattern
def put_event(event: dict):
    item = {
        'PK': f"USER#{event['user_id']}",
        'SK': f"EVENT#{event['timestamp']}#{event['event_id']}",
        'GSI1PK': f"TYPE#{event['event_type']}",
        'GSI1SK': f"DATE#{event['timestamp'][:10]}",
        'data': event
    }
    table.put_item(Item=item)

# Query by user
def get_user_events(user_id: str, limit: int = 100):
    response = table.query(
        KeyConditionExpression=Key('PK').eq(f"USER#{user_id}") & Key('SK').begins_with("EVENT#"),
        ScanIndexForward=False,
        Limit=limit
    )
    return response['Items']

# Query by event type (using GSI)
def get_events_by_type(event_type: str, date: str):
    response = table.query(
        IndexName='GSI1',
        KeyConditionExpression=Key('GSI1PK').eq(f"TYPE#{event_type}") & Key('GSI1SK').eq(f"DATE#{date}")
    )
    return response['Items']

# Batch write with exponential backoff
def batch_write_events(events: list):
    with table.batch_writer() as batch:
        for event in events:
            batch.put_item(Item=event)

3. Cassandra for Time Series

from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from datetime import datetime

cluster = Cluster(['node1', 'node2', 'node3'])
session = cluster.connect('analytics')

# Create table with time-based partitioning
session.execute("""
    CREATE TABLE IF NOT EXISTS events_by_day (
        date date,
        user_id uuid,
        event_time timestamp,
        event_type text,
        data text,
        PRIMARY KEY ((date), event_time, user_id)
    ) WITH CLUSTERING ORDER BY (event_time DESC)
""")

# Insert with prepared statement
insert_stmt = session.prepare("""
    INSERT INTO events_by_day (date, user_id, event_time, event_type, data)
    VALUES (?, ?, ?, ?, ?)
""")

def insert_event(event: dict):
    session.execute(insert_stmt, [
        event['timestamp'].date(),
        event['user_id'],
        event['timestamp'],
        event['event_type'],
        json.dumps(event['data'])
    ])

# Query by date range
def get_events_for_date(date: datetime.date):
    rows = session.execute(
        "SELECT * FROM events_by_day WHERE date = %s",
        [date]
    )
    return list(rows)

Tools & Technologies

| Tool | Purpose | Version (2025) | |------|---------|----------------| | MongoDB | Document store | 7.0+ | | Redis | Cache, pub/sub | 7.2+ | | Cassandra | Time series, wide column | 5.0+ | | DynamoDB | Managed key-value | Latest | | Elasticsearch | Search, analytics | 8.12+ | | ScyllaDB | High-perf Cassandra | 5.4+ |

Troubleshooting Guide

| Issue | Symptoms | Root Cause | Fix | |-------|----------|------------|-----| | Hot Partition | High latency on some keys | Uneven partition key | Redesign partition key | | Memory Pressure | Redis evictions, slow queries | Data > memory | Eviction policy, clustering | | Query Timeout | Slow reads in Cassandra | Missing index, large partition | Add index, limit partition size | | Consistency Issues | Stale reads | Eventual consistency | Use appropriate consistency level |

Best Practices

# ✅ DO: Design for access patterns (NoSQL)
# Primary key = partition key + sort key

# ✅ DO: Use connection pooling
pool = redis.ConnectionPool(max_connections=20)
r = redis.Redis(connection_pool=pool)

# ✅ DO: Set TTLs on cache data
r.setex(key, ttl_seconds, value)

# ✅ DO: Handle eventual consistency
# Read-your-writes with consistent reads where needed

# ❌ DON'T: Use NoSQL for complex joins
# ❌ DON'T: Store unbounded data in single document
# ❌ DON'T: Ignore partition sizing

Resources


Skill Certification Checklist:

  • [ ] Can design document schemas for MongoDB
  • [ ] Can implement caching patterns with Redis
  • [ ] Can model time series data in Cassandra
  • [ ] Can use DynamoDB single-table design
  • [ ] Can choose appropriate consistency levels
NoSQL Databases Skill | Agent Skills