Agent Skills: Monitoring & Observability

Prometheus, Grafana, logging, alerting, and data pipeline observability

prometheusgrafanaloggingalertingobservability
monitoringID: pluginagentmarketplace/custom-plugin-data-engineer/monitoring-observability

Skill Files

Browse the full folder contents for monitoring-observability.

Download Skill

Loading file tree…

skills/monitoring-observability/SKILL.md

Skill Metadata

Name
monitoring-observability
Description
Prometheus, Grafana, logging, alerting, and data pipeline observability

Monitoring & Observability

Production monitoring with Prometheus, Grafana, structured logging, and data quality observability.

Quick Start

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import structlog
import time

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)
logger = structlog.get_logger()

# Prometheus metrics
RECORDS_PROCESSED = Counter('records_processed_total', 'Total records processed', ['pipeline', 'status'])
PROCESSING_TIME = Histogram('processing_duration_seconds', 'Processing duration', ['pipeline'])
QUEUE_SIZE = Gauge('queue_size', 'Current queue size', ['queue_name'])

def process_batch(batch: list, pipeline_name: str):
    start_time = time.time()

    try:
        for record in batch:
            # Process record...
            RECORDS_PROCESSED.labels(pipeline=pipeline_name, status='success').inc()

        duration = time.time() - start_time
        PROCESSING_TIME.labels(pipeline=pipeline_name).observe(duration)

        logger.info("batch_processed",
            pipeline=pipeline_name,
            count=len(batch),
            duration_seconds=duration
        )

    except Exception as e:
        RECORDS_PROCESSED.labels(pipeline=pipeline_name, status='error').inc()
        logger.error("batch_failed", pipeline=pipeline_name, error=str(e))
        raise

# Start metrics server
start_http_server(8000)

Core Concepts

1. Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge, Summary

# Counter: monotonically increasing value
http_requests = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)
http_requests.labels(method='GET', endpoint='/api/data', status='200').inc()

# Histogram: distribution of values (latency, sizes)
request_latency = Histogram(
    'request_latency_seconds',
    'Request latency in seconds',
    ['endpoint'],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

with request_latency.labels(endpoint='/api/data').time():
    # Process request
    pass

# Gauge: value that can go up and down
active_connections = Gauge('active_connections', 'Active connections')
active_connections.inc()  # Connection opened
active_connections.dec()  # Connection closed

# Summary: similar to histogram with percentiles
response_size = Summary('response_size_bytes', 'Response size', ['endpoint'])
response_size.labels(endpoint='/api/data').observe(1024)

2. Grafana Dashboard (JSON)

{
  "title": "Data Pipeline Dashboard",
  "panels": [
    {
      "title": "Records Processed",
      "type": "stat",
      "targets": [{
        "expr": "sum(rate(records_processed_total[5m]))",
        "legendFormat": "Records/sec"
      }]
    },
    {
      "title": "Processing Latency P95",
      "type": "graph",
      "targets": [{
        "expr": "histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m]))",
        "legendFormat": "P95 Latency"
      }]
    },
    {
      "title": "Error Rate",
      "type": "gauge",
      "targets": [{
        "expr": "sum(rate(records_processed_total{status='error'}[5m])) / sum(rate(records_processed_total[5m])) * 100",
        "legendFormat": "Error %"
      }]
    }
  ]
}

3. Structured Logging

import structlog
from datetime import datetime

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)

logger = structlog.get_logger()

# Usage with context
log = logger.bind(service="etl-pipeline", environment="production")

def process_order(order_id: str, user_id: str):
    order_log = log.bind(order_id=order_id, user_id=user_id)

    order_log.info("processing_started")

    try:
        # Process...
        order_log.info("processing_completed", duration_ms=150)
    except Exception as e:
        order_log.error("processing_failed", error=str(e), exc_info=True)
        raise

4. Alerting Rules (Prometheus)

# alerting_rules.yml
groups:
  - name: data-pipeline-alerts
    rules:
      - alert: HighErrorRate
        expr: |
          sum(rate(records_processed_total{status="error"}[5m]))
          / sum(rate(records_processed_total[5m])) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate in data pipeline"
          description: "Error rate is {{ $value | humanizePercentage }}"

      - alert: PipelineStalled
        expr: |
          sum(rate(records_processed_total[10m])) == 0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Data pipeline is not processing records"

      - alert: HighLatency
        expr: |
          histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m])) > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High processing latency detected"

Tools & Technologies

| Tool | Purpose | Version (2025) | |------|---------|----------------| | Prometheus | Metrics collection | 2.50+ | | Grafana | Visualization | 10.3+ | | Loki | Log aggregation | 2.9+ | | Alertmanager | Alert routing | 0.27+ | | OpenTelemetry | Tracing standard | 1.24+ | | Datadog | Full observability | Latest | | Monte Carlo | Data observability | Latest |

Troubleshooting Guide

| Issue | Symptoms | Root Cause | Fix | |-------|----------|------------|-----| | Missing Metrics | Gaps in graphs | Scrape failure | Check targets, network | | High Cardinality | Prometheus OOM | Too many labels | Reduce label values | | Alert Fatigue | Too many alerts | Sensitive thresholds | Tune thresholds, add for duration | | Log Volume | High storage cost | Verbose logging | Adjust log levels |

Best Practices

# ✅ DO: Use appropriate metric types
# Counter for totals, Histogram for latency

# ✅ DO: Add meaningful labels (but limit cardinality)
REQUESTS.labels(method='GET', status='200', endpoint='/api').inc()

# ✅ DO: Include correlation IDs in logs
logger.info("request_completed", request_id=request_id)

# ✅ DO: Set up dashboards for key metrics

# ❌ DON'T: High cardinality labels (user_id, request_id as labels)
# ❌ DON'T: Log sensitive data
# ❌ DON'T: Alert on every error

Resources


Skill Certification Checklist:

  • [ ] Can instrument applications with Prometheus metrics
  • [ ] Can create Grafana dashboards
  • [ ] Can implement structured logging
  • [ ] Can set up alerting rules
  • [ ] Can troubleshoot observability issues