Agent Skills: Telemetry Validator Agent

>

UncategorizedID: Kart-rc/dataobservability-agents/telemetry-validator-agent

Install this agent skill to your local

pnpm dlx add-skill https://github.com/Kart-rc/dataobservability-agents/tree/HEAD/docs/autopilot-agent-expert/skills/telemetry-validator-agent

Skill Files

Browse the full folder contents for telemetry-validator-agent.

Download Skill

Loading file tree…

docs/autopilot-agent-expert/skills/telemetry-validator-agent/SKILL.md

Skill Metadata

Name
telemetry-validator-agent
Description
>

Telemetry Validator Agent

The Telemetry Validator is the final verification layer that confirms instrumentation actually works by running services in sandbox environments and verifying expected signals arrive at the observability backend.

Core Responsibilities

  1. Environment Setup: Spin up isolated test environments
  2. Traffic Generation: Send synthetic requests to exercise code paths
  3. Signal Querying: Query OTel Collector for expected spans/metrics
  4. Header Validation: Verify correlation headers in Kafka messages
  5. Lineage Verification: Confirm OpenLineage events for data jobs
  6. Evidence Collection: Capture proof of successful validation
  7. Report Generation: Produce detailed validation reports

Validation Process

Step 1: Environment Setup

async def setup_environment(service_name: str, commit_sha: str):
    """Deploy service to isolated namespace."""
    namespace = f"validator-{service_name}-{commit_sha[:8]}"
    
    # Create K8s namespace
    await kubectl.create_namespace(namespace)
    
    # Deploy service from PR branch
    await helm.install(
        release=service_name,
        namespace=namespace,
        values={
            "image.tag": commit_sha,
            "otel.enabled": True,
            "otel.collector": VALIDATOR_COLLECTOR_URL
        }
    )
    
    # Wait for ready
    await kubectl.wait_for_ready(namespace, timeout=120)
    
    return namespace

Step 2: Synthetic Traffic Generation

async def generate_traffic(service_url: str, archetype: str):
    """Generate synthetic traffic based on archetype."""
    
    if archetype == "kafka-microservice":
        # Send HTTP request that triggers Kafka produce
        response = await httpx.post(
            f"{service_url}/api/orders",
            json={"order_id": "test-123", "amount": 99.99},
            headers={"traceparent": "00-test-trace-id-test-span-id-01"}
        )
        
    elif archetype == "rest-api":
        # Send multiple HTTP requests
        for endpoint in ["/health", "/api/v1/resource"]:
            await httpx.get(f"{service_url}{endpoint}")
            
    elif archetype == "airflow-dag":
        # Trigger DAG run
        await airflow.trigger_dag(dag_id="test_dag", conf={})
        
    elif archetype == "spark-job":
        # Submit Spark job
        await spark.submit(job="test_job.py", conf={})

Step 3: Signal Querying

async def query_spans(
    service_name: str,
    trace_id: str,
    expected_operations: list[str]
) -> list[Span]:
    """Query OTel Collector for expected spans."""
    
    spans = await otel_client.query(
        service=service_name,
        trace_id=trace_id,
        time_range="5m"
    )
    
    found_operations = {s.operation_name for s in spans}
    missing = set(expected_operations) - found_operations
    
    if missing:
        raise ValidationError(f"Missing spans: {missing}")
    
    return spans

Step 4: Attribute Validation

def validate_span_attributes(span: Span, expected: dict) -> bool:
    """Validate span has required attributes."""
    
    for key, value in expected.items():
        if key not in span.attributes:
            return False
        if value != "*" and span.attributes[key] != value:
            return False
    
    return True

Step 5: Kafka Header Verification

async def check_kafka_headers(topic: str) -> dict:
    """Verify correlation headers in Kafka messages."""
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=KAFKA_BOOTSTRAP,
        consumer_timeout_ms=10000
    )
    
    for message in consumer:
        headers = dict(message.headers)
        
        required = ["traceparent", "x-obs-producer-service", "x-obs-mapping-id"]
        missing = [h for h in required if h not in headers]
        
        if not missing:
            return headers
    
    raise ValidationError(f"Missing headers in {topic}")

Expected Signals by Archetype

| Archetype | Expected Signals | Validation Query | |-----------|------------------|------------------| | Kafka Producer | Span: {topic} send, Attrs: messaging.destination, x-obs-* | spans.where(operation="send").attrs.has("x-obs-mapping-id") | | Kafka Consumer | Span: {topic} receive, Attrs: consumer_group, partition, offset | spans.where(operation="receive").attrs.has("messaging.kafka.consumer.group") | | Airflow Task | OpenLineage RunEvent with inputs/outputs | lineage_events.where(job.name="{dag}.{task}").has(outputs) | | Spark Job | OpenLineage with column lineage facet | lineage_events.where(job.namespace="spark").facets.has("columnLineage") | | HTTP Handler | Span: HTTP {method}, Attrs: http.route, http.status_code | spans.where(kind="SERVER").attrs.has("http.route") | | gRPC Server | Span: {service}/{method}, Attrs: rpc.system, rpc.service | spans.where(attrs.rpc.system="grpc") |

Validation Report Schema

{
  "validation_id": "val-2026-01-04-001",
  "repository": "orders-enricher",
  "commit_sha": "abc123def",
  "timestamp": "2026-01-04T11:00:00Z",
  "environment": "staging",
  "status": "PASSED",
  "duration_seconds": 45,
  "tests": [
    {
      "name": "otel_spans_emitted",
      "status": "PASSED",
      "expected": 3,
      "actual": 3,
      "details": "All expected spans found with correct attributes"
    },
    {
      "name": "correlation_headers_present",
      "status": "PASSED",
      "headers_validated": ["traceparent", "x-obs-producer-service", "x-obs-mapping-id"],
      "details": "All required headers present in Kafka messages"
    },
    {
      "name": "lineage_event_emitted",
      "status": "PASSED",
      "event_type": "RunEvent",
      "inputs": ["urn:kafka:prod:msk:orders_raw"],
      "outputs": ["urn:kafka:prod:msk:orders_enriched"]
    }
  ],
  "evidence": {
    "span_ids": ["span-001", "span-002", "span-003"],
    "kafka_offsets": {"orders_enriched": 12345},
    "screenshots": ["https://s3.../validation-dashboard.png"]
  },
  "recommendation": "Ready to merge. All telemetry validation checks passed."
}

Scripts

  • scripts/validate_service.py: Main validation orchestrator
  • scripts/traffic_generator.py: Synthetic traffic generation
  • scripts/otel_client.py: OTel Collector query client
  • scripts/kafka_inspector.py: Kafka header inspection
  • scripts/report_generator.py: Validation report generation

References

  • references/expected-signals.md: Signal expectations by archetype
  • references/validation-queries.md: OTel query patterns
  • references/report-schema.json: Validation report JSON schema

Configuration

telemetry_validator:
  enabled: true
  environment: "staging"
  timeout_seconds: 120
  otel_collector_url: "https://otel-collector.staging:4318"
  kafka_bootstrap: "kafka.staging:9092"
  retry_attempts: 3
  cleanup_after: true

Integration Points

| System | Integration | Purpose | |--------|-------------|---------| | Kubernetes | API | Namespace creation, deployment | | OTel Collector | OTLP/Query API | Span ingestion and querying | | Kafka | Consumer API | Header inspection | | OpenLineage | API | Lineage event verification | | S3 | SDK | Evidence storage | | GitHub | Status API | Report validation status |