Agent Skills: Signal Factory Core

>

UncategorizedID: Kart-rc/dataobservability-agents/signal-factory-core

Install this agent skill to your local

pnpm dlx add-skill https://github.com/Kart-rc/dataobservability-agents/tree/HEAD/docs/autopilot-agent-expert/skills/signal-factory-core

Skill Files

Browse the full folder contents for signal-factory-core.

Download Skill

Loading file tree…

docs/autopilot-agent-expert/skills/signal-factory-core/SKILL.md

Skill Metadata

Name
signal-factory-core
Description
>

Signal Factory Core

The Signal Factory Core is the central processing hub that transforms raw telemetry into actionable signals, maintains the knowledge graph in Neptune, and powers the RCA Copilot with pre-computed incident context.

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                      SIGNAL FACTORY CORE                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
│  │   Signal    │───▶│  Event Bus  │───▶│   Signal Engines    │ │
│  │   Router    │    │ (Kinesis/   │    │ ┌─────┐ ┌─────┐    │ │
│  │             │    │    MSK)     │    │ │Fresh│ │ Vol │    │ │
│  └─────────────┘    └─────────────┘    │ └─────┘ └─────┘    │ │
│        │                               │ ┌─────┐ ┌─────┐    │ │
│        │ Normalization                 │ │Drift│ │ DQ  │    │ │
│        │ + Correlation                 │ └─────┘ └─────┘    │ │
│        │ + URN Resolution              │ ┌─────┐ ┌─────┐    │ │
│        ▼                               │ │Anom │ │Cost │    │ │
│  ┌─────────────┐                       │ └─────┘ └─────┘    │ │
│  │  Canonical  │                       └─────────────────────┘ │
│  │   Signal    │                               │               │
│  │   Event     │                               ▼               │
│  └─────────────┘                       ┌───────────────────┐   │
│                                        │ State & Graph     │   │
│                                        │ ┌─────┐ ┌─────┐  │   │
│                                        │ │DDB  │ │Nept │  │   │
│                                        │ └─────┘ └─────┘  │   │
│                                        └───────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Signal Router

Responsibilities

  1. Normalization: Convert raw inputs into canonical Signal Events
  2. URN Resolution: Standardize asset identifiers
  3. Correlation Extraction: Extract trace_id, handoff_ids, lineage refs
  4. Schema Resolution: Bind events to schema versions
  5. Routing: Direct signals to appropriate engines

Canonical Signal Event Schema

{
  "event_id": "sig-2026-01-04-001",
  "event_type": "SignalEvent",
  "timestamp": "2026-01-04T10:00:00Z",
  "source": {
    "type": "kafka",
    "asset_urn": "urn:kafka:prod:msk:orders_enriched",
    "service_urn": "urn:svc:prod:commerce:orders-enricher"
  },
  "correlation": {
    "trace_id": "abc123",
    "span_id": "def456",
    "handoff_ids": {
      "kafka_offset": 12345,
      "partition": 3
    },
    "lineage_ref": "map-orders-enriched-v7"
  },
  "payload": {
    "signal_type": "volume",
    "value": 1523,
    "unit": "messages_per_minute"
  },
  "schema": {
    "id": "schema-v42",
    "version": 42,
    "registry": "confluent"
  }
}

Signal Engines

Freshness Engine

Detects stale data based on expected arrival times.

class FreshnessEngine:
    async def process(self, event: SignalEvent) -> SignalState:
        asset = await self.get_asset(event.source.asset_urn)
        sla = asset.contracts.freshness.max_delay_minutes
        
        last_update = await self.get_last_update(asset.urn)
        delay = (event.timestamp - last_update).minutes
        
        if delay > sla:
            return SignalState(
                asset_urn=asset.urn,
                signal_type="freshness",
                status="BREACH",
                severity=self.calculate_severity(delay, sla),
                evidence=FreshnessEvidence(
                    expected_delay=sla,
                    actual_delay=delay,
                    last_update=last_update
                )
            )
        return SignalState(status="OK")

Volume Engine

Detects anomalies in data volume.

class VolumeEngine:
    async def process(self, event: SignalEvent) -> SignalState:
        asset = await self.get_asset(event.source.asset_urn)
        baseline = await self.get_baseline(asset.urn, window="7d")
        
        current = event.payload.value
        z_score = (current - baseline.mean) / baseline.stddev
        
        if abs(z_score) > 3:  # 3-sigma anomaly
            return SignalState(
                status="ANOMALY",
                evidence=VolumeEvidence(
                    expected=baseline.mean,
                    actual=current,
                    z_score=z_score
                )
            )

Schema Drift Engine

Detects breaking schema changes.

class DriftEngine:
    async def process(self, event: SignalEvent) -> SignalState:
        current = await self.get_schema(event.schema.id)
        previous = await self.get_schema(f"schema-v{event.schema.version - 1}")
        
        compatibility = await self.check_compatibility(previous, current)
        
        if not compatibility.is_backward:
            return SignalState(
                status="DRIFT",
                evidence=DriftEvidence(
                    breaking_changes=compatibility.changes,
                    affected_consumers=await self.get_consumers(event.source.asset_urn)
                )
            )

Contract Engine

Validates data against contracts.

DQ Engine

Processes Deequ results and quality metrics.

Anomaly Engine

ML-based anomaly detection across all metrics.

Cost Engine

Tracks and allocates compute costs.

Neptune Graph Schema

Node Types

// Asset nodes
g.addV('Asset')
  .property('urn', 'urn:kafka:prod:msk:orders_enriched')
  .property('type', 'kafka-topic')
  .property('tier', 1)
  .property('owner', 'orders-team')

// Service nodes
g.addV('Service')
  .property('urn', 'urn:svc:prod:commerce:orders-enricher')
  .property('language', 'java')
  .property('framework', 'spring-boot')

// Incident nodes
g.addV('Incident')
  .property('id', 'INC-2026-01-04-001')
  .property('status', 'ACTIVE')
  .property('severity', 'P1')

// Evidence nodes
g.addV('Evidence')
  .property('type', 'schema_drift')
  .property('confidence', 0.95)

Edge Types

// Lineage edges
g.V(service).addE('PRODUCES').to(topic)
g.V(service).addE('CONSUMES').from(topic)

// Incident edges
g.V(incident).addE('AFFECTS').to(asset)
g.V(incident).addE('SUPPORTED_BY').to(evidence)

// Schema edges
g.V(schema_v42).addE('SUPERSEDES').to(schema_v41)

DynamoDB State Tables

SignalState Table

{
  "pk": "urn:kafka:prod:msk:orders_enriched",
  "sk": "signal#freshness",
  "status": "OK",
  "last_value": 1523,
  "last_update": "2026-01-04T10:00:00Z",
  "ttl": 1704448800
}

IncidentContextCache Table

{
  "pk": "INC-2026-01-04-001",
  "incident_id": "INC-2026-01-04-001",
  "primary_asset": "urn:kafka:prod:msk:orders_enriched",
  "top_evidence": ["schema_drift_v42", "volume_drop_10am"],
  "blast_radius": ["consumer-a", "consumer-b", "delta-table"],
  "timeline": [...],
  "updated_at": "2026-01-04T10:05:00Z"
}

Scripts

  • scripts/signal_router.py: Main routing and normalization
  • scripts/freshness_engine.py: Freshness signal processing
  • scripts/volume_engine.py: Volume anomaly detection
  • scripts/drift_engine.py: Schema drift detection
  • scripts/graph_writer.py: Neptune graph operations
  • scripts/state_manager.py: DynamoDB state operations

References

  • references/signal-schema.json: Canonical signal event schema
  • references/graph-schema.md: Neptune graph model
  • references/engine-configs/: Per-engine configuration

Configuration

signal_factory:
  router:
    input_topic: "raw-telemetry"
    output_topic: "canonical-signals"
  engines:
    freshness:
      enabled: true
      check_interval_seconds: 60
    volume:
      enabled: true
      baseline_window_days: 7
    drift:
      enabled: true
  storage:
    neptune:
      endpoint: "wss://neptune.us-east-1.amazonaws.com:8182/gremlin"
    dynamodb:
      signal_state_table: "SignalState"
      incident_cache_table: "IncidentContextCache"