Agent Skills: Performing IOC Enrichment Automation

>

UncategorizedID: plurigrid/asi/performing-ioc-enrichment-automation

Install this agent skill to your local

pnpm dlx add-skill https://github.com/plurigrid/asi/tree/HEAD/plugins/asi/skills/performing-ioc-enrichment-automation

Skill Files

Browse the full folder contents for performing-ioc-enrichment-automation.

Download Skill

Loading file tree…

plugins/asi/skills/performing-ioc-enrichment-automation/SKILL.md

Skill Metadata

Name
performing-ioc-enrichment-automation
Description
>

Performing IOC Enrichment Automation

When to Use

Use this skill when:

  • SOC analysts need to quickly enrich IOCs from multiple sources during alert triage
  • High alert volumes require automated enrichment to reduce manual lookup time
  • Incident investigations need comprehensive IOC context for scope assessment
  • SOAR playbooks require enrichment actions as part of automated triage workflows

Do not use for bulk blocking decisions without analyst review — enrichment provides context, not definitive malicious/benign determination.

Prerequisites

  • API keys: VirusTotal (free or premium), AbuseIPDB, Shodan, URLScan.io, GreyNoise
  • Python 3.8+ with requests, vt-py, shodan libraries
  • MISP instance or TIP for cross-referencing organizational intelligence
  • SOAR platform (optional) for workflow integration
  • Rate limit awareness: VT free (4 req/min), AbuseIPDB (1000/day), Shodan (1 req/sec)

Workflow

Step 1: Build Unified Enrichment Engine

Create a multi-source enrichment pipeline:

import requests
import vt
import shodan
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class EnrichmentResult:
    ioc_value: str
    ioc_type: str
    virustotal: dict = field(default_factory=dict)
    abuseipdb: dict = field(default_factory=dict)
    shodan_data: dict = field(default_factory=dict)
    greynoise: dict = field(default_factory=dict)
    urlscan: dict = field(default_factory=dict)
    misp_matches: list = field(default_factory=list)
    risk_score: float = 0.0
    disposition: str = "Unknown"

class IOCEnrichmentEngine:
    def __init__(self, config):
        self.vt_client = vt.Client(config["virustotal_key"])
        self.shodan_api = shodan.Shodan(config["shodan_key"])
        self.abuseipdb_key = config["abuseipdb_key"]
        self.greynoise_key = config["greynoise_key"]
        self.urlscan_key = config["urlscan_key"]

    def enrich_ip(self, ip_address):
        result = EnrichmentResult(ioc_value=ip_address, ioc_type="ip")

        # VirusTotal
        try:
            vt_obj = self.vt_client.get_object(f"/ip_addresses/{ip_address}")
            result.virustotal = {
                "malicious": vt_obj.last_analysis_stats.get("malicious", 0),
                "suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
                "total_engines": sum(vt_obj.last_analysis_stats.values()),
                "reputation": vt_obj.reputation,
                "country": getattr(vt_obj, "country", "Unknown"),
                "as_owner": getattr(vt_obj, "as_owner", "Unknown")
            }
        except Exception as e:
            result.virustotal = {"error": str(e)}

        # AbuseIPDB
        try:
            response = requests.get(
                "https://api.abuseipdb.com/api/v2/check",
                headers={"Key": self.abuseipdb_key, "Accept": "application/json"},
                params={"ipAddress": ip_address, "maxAgeInDays": 90}
            )
            data = response.json()["data"]
            result.abuseipdb = {
                "confidence_score": data["abuseConfidenceScore"],
                "total_reports": data["totalReports"],
                "is_tor": data.get("isTor", False),
                "usage_type": data.get("usageType", "Unknown"),
                "isp": data.get("isp", "Unknown"),
                "domain": data.get("domain", "Unknown")
            }
        except Exception as e:
            result.abuseipdb = {"error": str(e)}

        # Shodan
        try:
            host = self.shodan_api.host(ip_address)
            result.shodan_data = {
                "ports": host.get("ports", []),
                "os": host.get("os", "Unknown"),
                "organization": host.get("org", "Unknown"),
                "isp": host.get("isp", "Unknown"),
                "vulns": host.get("vulns", []),
                "last_update": host.get("last_update", "Unknown")
            }
        except shodan.APIError:
            result.shodan_data = {"status": "Not found in Shodan"}

        # GreyNoise
        try:
            response = requests.get(
                f"https://api.greynoise.io/v3/community/{ip_address}",
                headers={"key": self.greynoise_key}
            )
            gn_data = response.json()
            result.greynoise = {
                "classification": gn_data.get("classification", "unknown"),
                "noise": gn_data.get("noise", False),
                "riot": gn_data.get("riot", False),
                "name": gn_data.get("name", "Unknown")
            }
        except Exception as e:
            result.greynoise = {"error": str(e)}

        # Calculate composite risk score
        result.risk_score = self._calculate_ip_risk(result)
        result.disposition = self._determine_disposition(result.risk_score)
        return result

    def enrich_domain(self, domain):
        result = EnrichmentResult(ioc_value=domain, ioc_type="domain")

        # VirusTotal
        try:
            vt_obj = self.vt_client.get_object(f"/domains/{domain}")
            result.virustotal = {
                "malicious": vt_obj.last_analysis_stats.get("malicious", 0),
                "suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
                "reputation": vt_obj.reputation,
                "creation_date": getattr(vt_obj, "creation_date", "Unknown"),
                "registrar": getattr(vt_obj, "registrar", "Unknown"),
                "categories": getattr(vt_obj, "categories", {})
            }
        except Exception as e:
            result.virustotal = {"error": str(e)}

        # URLScan.io
        try:
            response = requests.get(
                f"https://urlscan.io/api/v1/search/?q=domain:{domain}",
                headers={"API-Key": self.urlscan_key}
            )
            scans = response.json().get("results", [])
            result.urlscan = {
                "total_scans": len(scans),
                "verdicts": [s.get("verdicts", {}).get("overall", {}).get("malicious", False)
                            for s in scans[:5]],
                "last_scan": scans[0]["task"]["time"] if scans else "Never scanned"
            }
        except Exception as e:
            result.urlscan = {"error": str(e)}

        result.risk_score = self._calculate_domain_risk(result)
        result.disposition = self._determine_disposition(result.risk_score)
        return result

    def enrich_hash(self, file_hash):
        result = EnrichmentResult(ioc_value=file_hash, ioc_type="hash")

        # VirusTotal
        try:
            vt_obj = self.vt_client.get_object(f"/files/{file_hash}")
            result.virustotal = {
                "malicious": vt_obj.last_analysis_stats.get("malicious", 0),
                "suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
                "undetected": vt_obj.last_analysis_stats.get("undetected", 0),
                "total_engines": sum(vt_obj.last_analysis_stats.values()),
                "type_description": getattr(vt_obj, "type_description", "Unknown"),
                "popular_threat_name": getattr(vt_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "sandbox_verdicts": getattr(vt_obj, "sandbox_verdicts", {}),
                "first_seen": getattr(vt_obj, "first_submission_date", "Unknown")
            }
        except vt.APIError:
            result.virustotal = {"status": "Not found in VirusTotal"}

        # MalwareBazaar
        try:
            response = requests.post(
                "https://mb-api.abuse.ch/api/v1/",
                data={"query": "get_info", "hash": file_hash}
            )
            mb_data = response.json()
            if mb_data["query_status"] == "ok":
                entry = mb_data["data"][0]
                result.abuseipdb = {  # Reusing field for MalwareBazaar data
                    "malware_family": entry.get("signature", "Unknown"),
                    "tags": entry.get("tags", []),
                    "file_type": entry.get("file_type", "Unknown"),
                    "delivery_method": entry.get("delivery_method", "Unknown"),
                    "first_seen": entry.get("first_seen", "Unknown")
                }
        except Exception:
            pass

        result.risk_score = self._calculate_hash_risk(result)
        result.disposition = self._determine_disposition(result.risk_score)
        return result

    def _calculate_ip_risk(self, result):
        score = 0
        vt = result.virustotal
        abuse = result.abuseipdb
        gn = result.greynoise

        if isinstance(vt, dict) and "malicious" in vt:
            score += min(vt["malicious"] * 3, 30)
        if isinstance(abuse, dict) and "confidence_score" in abuse:
            score += abuse["confidence_score"] * 0.3
        if isinstance(gn, dict):
            if gn.get("classification") == "malicious":
                score += 20
            elif gn.get("riot"):
                score -= 20  # Known benign service
        return min(max(score, 0), 100)

    def _calculate_domain_risk(self, result):
        score = 0
        vt = result.virustotal
        if isinstance(vt, dict) and "malicious" in vt:
            score += min(vt["malicious"] * 4, 40)
            if vt.get("reputation", 0) < -5:
                score += 20
        return min(max(score, 0), 100)

    def _calculate_hash_risk(self, result):
        score = 0
        vt = result.virustotal
        if isinstance(vt, dict) and "malicious" in vt:
            total = vt.get("total_engines", 1)
            detection_rate = vt["malicious"] / total if total > 0 else 0
            score = detection_rate * 100
        return min(max(score, 0), 100)

    def _determine_disposition(self, risk_score):
        if risk_score >= 70:
            return "MALICIOUS — Block recommended"
        elif risk_score >= 40:
            return "SUSPICIOUS — Monitor and investigate"
        elif risk_score >= 10:
            return "LOW RISK — Likely benign, verify context"
        else:
            return "CLEAN — No indicators of malicious activity"

    def close(self):
        self.vt_client.close()

Step 2: Batch Enrichment for Incident Investigation

# Process multiple IOCs from an incident
iocs = [
    {"type": "ip", "value": "185.234.218.50"},
    {"type": "domain", "value": "evil-c2-server.com"},
    {"type": "hash", "value": "a1b2c3d4e5f6..."},
    {"type": "ip", "value": "45.33.32.156"},
]

config = {
    "virustotal_key": "YOUR_VT_KEY",
    "shodan_key": "YOUR_SHODAN_KEY",
    "abuseipdb_key": "YOUR_ABUSEIPDB_KEY",
    "greynoise_key": "YOUR_GREYNOISE_KEY",
    "urlscan_key": "YOUR_URLSCAN_KEY"
}

engine = IOCEnrichmentEngine(config)

results = []
for ioc in iocs:
    if ioc["type"] == "ip":
        result = engine.enrich_ip(ioc["value"])
    elif ioc["type"] == "domain":
        result = engine.enrich_domain(ioc["value"])
    elif ioc["type"] == "hash":
        result = engine.enrich_hash(ioc["value"])
    results.append(result)
    time.sleep(15)  # Rate limiting for free VT API

engine.close()

# Print summary
for r in results:
    print(f"{r.ioc_type}: {r.ioc_value}")
    print(f"  Risk Score: {r.risk_score}")
    print(f"  Disposition: {r.disposition}")
    print()

Step 3: Integrate with Splunk for Automated Enrichment

Create a Splunk custom search command for inline enrichment:

index=notable sourcetype="stash"
| table src_ip, dest_ip, file_hash, url
| lookup threat_intel_ip_lookup ip AS src_ip OUTPUT vt_score, abuse_score, disposition
| lookup threat_intel_hash_lookup hash AS file_hash OUTPUT vt_detections, malware_family
| eval combined_risk = coalesce(vt_score, 0) + coalesce(abuse_score, 0)
| where combined_risk > 50
| sort - combined_risk

Step 4: Generate Enrichment Report

def generate_enrichment_report(results):
    report = []
    report.append("IOC ENRICHMENT REPORT")
    report.append("=" * 60)

    for r in sorted(results, key=lambda x: x.risk_score, reverse=True):
        report.append(f"\n{r.ioc_type.upper()}: {r.ioc_value}")
        report.append(f"  Risk Score: {r.risk_score}/100")
        report.append(f"  Disposition: {r.disposition}")

        if r.virustotal and "malicious" in r.virustotal:
            report.append(f"  VirusTotal: {r.virustotal['malicious']}/{r.virustotal.get('total_engines', 'N/A')} malicious")
        if r.abuseipdb and "confidence_score" in r.abuseipdb:
            report.append(f"  AbuseIPDB: {r.abuseipdb['confidence_score']}% confidence, {r.abuseipdb['total_reports']} reports")
        if r.greynoise and "classification" in r.greynoise:
            report.append(f"  GreyNoise: {r.greynoise['classification']}")
        if r.shodan_data and "ports" in r.shodan_data:
            report.append(f"  Shodan: Ports {r.shodan_data['ports']}, Org: {r.shodan_data.get('organization', 'N/A')}")

    return "\n".join(report)

Key Concepts

| Term | Definition | |------|-----------| | IOC Enrichment | Process of adding contextual intelligence to raw indicators from multiple external sources | | Composite Risk Score | Weighted aggregate score combining multiple intelligence sources for disposition decisions | | Rate Limiting | API request restrictions requiring throttling (VT free: 4/min, AbuseIPDB: 1000/day) | | GreyNoise RIOT | Rule It Out — GreyNoise dataset of known benign services to reduce false positives | | Passive DNS | Historical DNS resolution data showing domain-to-IP mappings over time | | Defanging | Modifying IOCs for safe handling in reports (evil.com becomes evil[.]com) |

Tools & Systems

  • VirusTotal: Multi-engine malware scanner providing file, URL, IP, and domain analysis with 70+ AV engines
  • AbuseIPDB: Community IP reputation database with abuse confidence scoring and ISP attribution
  • Shodan: Internet-wide scanner providing open ports, banners, and vulnerability data for IP addresses
  • GreyNoise: Internet noise intelligence distinguishing targeted attacks from opportunistic scanning
  • URLScan.io: URL analysis platform capturing screenshots, DOM, and network requests for phishing detection

Common Scenarios

  • Alert Triage Enrichment: Auto-enrich all IPs in a notable event to determine if source is known malicious
  • Incident Scope Assessment: Batch-enrich all IOCs from a compromised host to identify C2 infrastructure
  • Threat Intel Validation: Enrich received IOC feed to validate quality before adding to blocking controls
  • Phishing URL Analysis: Enrich URLs from reported phishing emails with URLScan and VT before user notification
  • False Positive Investigation: Enrich flagged IP to determine if it belongs to CDN/cloud provider (legitimate)

Output Format

IOC ENRICHMENT REPORT — IR-2024-0450
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Enrichment Time: 2024-03-15 14:30 UTC
IOCs Processed:  4

IP: 185.234.218[.]50
  Risk Score:   87/100 — MALICIOUS
  VirusTotal:   14/90 engines flagged malicious
  AbuseIPDB:    92% confidence, 347 reports
  Shodan:       Ports [22, 80, 443, 4444], Org: BulletProof Hosting
  GreyNoise:    malicious — known C2 infrastructure
  Action:       BLOCK immediately

DOMAIN: evil-c2-server[.]com
  Risk Score:   73/100 — MALICIOUS
  VirusTotal:   8/90 engines flagged
  URLScan:      5 scans, 4 malicious verdicts
  WHOIS:        Registered 3 days ago via Namecheap
  Action:       BLOCK and add to DNS sinkhole

HASH: a1b2c3d4e5f6...
  Risk Score:   91/100 — MALICIOUS
  VirusTotal:   52/72 engines (Cobalt Strike Beacon)
  MalwareBazaar: Tags: cobalt-strike, beacon, c2
  Action:       BLOCK hash, quarantine affected endpoints

IP: 45.33.32[.]156
  Risk Score:   5/100 — CLEAN
  VirusTotal:   0/90 engines
  GreyNoise:    benign — Shodan scanner
  Action:       No action required (known scanner)