Agent Skills: Building Automated Malware Submission Pipeline

>

UncategorizedID: plurigrid/asi/building-automated-malware-submission-pipeline

Install this agent skill to your local

pnpm dlx add-skill https://github.com/plurigrid/asi/tree/HEAD/plugins/asi/skills/building-automated-malware-submission-pipeline

Skill Files

Browse the full folder contents for building-automated-malware-submission-pipeline.

Download Skill

Loading file tree…

plugins/asi/skills/building-automated-malware-submission-pipeline/SKILL.md

Skill Metadata

Name
building-automated-malware-submission-pipeline
Description
>

Building Automated Malware Submission Pipeline

When to Use

Use this skill when:

  • SOC teams face high volume of suspicious file alerts requiring sandbox analysis
  • Manual sandbox submission creates bottlenecks in alert triage workflow
  • Endpoint and email security tools quarantine files needing automated verdict determination
  • Incident response requires rapid malware family identification and IOC extraction

Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.

Prerequisites

  • Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
  • VirusTotal API key (Enterprise for submission, free for lookup)
  • MalwareBazaar API access for known malware lookup
  • File collection mechanism: EDR quarantine API, email gateway export, network capture
  • Python 3.8+ with requests, vt-py, pefile libraries
  • Isolated analysis network with no production connectivity

Workflow

Step 1: Build File Collection Pipeline

Collect suspicious files from multiple sources:

import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime

class MalwareCollector:
    def __init__(self, quarantine_dir="/opt/malware_quarantine"):
        self.quarantine_dir = Path(quarantine_dir)
        self.quarantine_dir.mkdir(exist_ok=True)

    def collect_from_edr(self, edr_api_url, api_token):
        """Pull quarantined files from CrowdStrike Falcon"""
        headers = {"Authorization": f"Bearer {api_token}"}

        # Get recent quarantine events
        response = requests.get(
            f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
            headers=headers,
            params={"filter": "state:'quarantined'", "limit": 50}
        )
        file_ids = response.json()["resources"]

        for file_id in file_ids:
            # Download quarantined file
            dl_response = requests.get(
                f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
                headers=headers,
                params={"ids": file_id}
            )
            file_data = dl_response.content
            sha256 = hashlib.sha256(file_data).hexdigest()

            filepath = self.quarantine_dir / f"{sha256}.sample"
            filepath.write_bytes(file_data)
            yield {"sha256": sha256, "path": str(filepath), "source": "edr"}

    def collect_from_email_gateway(self, smtp_quarantine_path):
        """Pull attachments from email gateway quarantine"""
        import email
        from email import policy

        for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
            msg = email.message_from_binary_file(
                eml_file.open("rb"), policy=policy.default
            )
            for attachment in msg.iter_attachments():
                content = attachment.get_content()
                if isinstance(content, str):
                    content = content.encode()
                sha256 = hashlib.sha256(content).hexdigest()
                filename = attachment.get_filename() or "unknown"

                filepath = self.quarantine_dir / f"{sha256}.sample"
                filepath.write_bytes(content)
                yield {
                    "sha256": sha256,
                    "path": str(filepath),
                    "source": "email",
                    "original_filename": filename,
                    "sender": msg["From"],
                    "subject": msg["Subject"]
                }

    def compute_hashes(self, filepath):
        """Calculate MD5, SHA1, SHA256 for a file"""
        with open(filepath, "rb") as f:
            content = f.read()
        return {
            "md5": hashlib.md5(content).hexdigest(),
            "sha1": hashlib.sha1(content).hexdigest(),
            "sha256": hashlib.sha256(content).hexdigest(),
            "size": len(content)
        }

Step 2: Pre-Screen with Hash Lookups

Check if the file is already known before sandbox submission:

import vt

class MalwarePreScreener:
    def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
        self.vt_client = vt.Client(vt_api_key)
        self.mb_api_url = mb_api_url

    def check_virustotal(self, sha256):
        """Lookup hash in VirusTotal"""
        try:
            file_obj = self.vt_client.get_object(f"/files/{sha256}")
            stats = file_obj.last_analysis_stats
            return {
                "found": True,
                "malicious": stats.get("malicious", 0),
                "suspicious": stats.get("suspicious", 0),
                "undetected": stats.get("undetected", 0),
                "total": sum(stats.values()),
                "threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "type": getattr(file_obj, "type_description", "Unknown")
            }
        except vt.APIError:
            return {"found": False}

    def check_malwarebazaar(self, sha256):
        """Lookup hash in MalwareBazaar"""
        response = requests.post(
            self.mb_api_url,
            data={"query": "get_info", "hash": sha256}
        )
        data = response.json()
        if data["query_status"] == "ok":
            entry = data["data"][0]
            return {
                "found": True,
                "signature": entry.get("signature", "Unknown"),
                "tags": entry.get("tags", []),
                "file_type": entry.get("file_type", "Unknown"),
                "first_seen": entry.get("first_seen", "Unknown")
            }
        return {"found": False}

    def pre_screen(self, sha256):
        """Run all pre-screening checks"""
        vt_result = self.check_virustotal(sha256)
        mb_result = self.check_malwarebazaar(sha256)

        verdict = "UNKNOWN"
        if vt_result["found"] and vt_result.get("malicious", 0) > 10:
            verdict = "KNOWN_MALICIOUS"
        elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
            verdict = "LIKELY_CLEAN"

        return {
            "sha256": sha256,
            "virustotal": vt_result,
            "malwarebazaar": mb_result,
            "pre_screen_verdict": verdict,
            "needs_sandbox": verdict == "UNKNOWN"
        }

    def close(self):
        self.vt_client.close()

Step 3: Submit to Sandbox for Dynamic Analysis

Cuckoo Sandbox Submission:

class SandboxSubmitter:
    def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
        self.cuckoo_url = cuckoo_url

    def submit_to_cuckoo(self, filepath, timeout=300):
        """Submit file to Cuckoo Sandbox"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{self.cuckoo_url}/tasks/create/file",
                files={"file": f},
                data={
                    "timeout": timeout,
                    "options": "procmemdump=yes,route=none",
                    "priority": 2,
                    "machine": "win10_x64"
                }
            )
        task_id = response.json()["task_id"]
        return task_id

    def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
        """Wait for sandbox analysis to complete"""
        import time
        elapsed = 0
        while elapsed < max_wait:
            response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
            status = response.json()["task"]["status"]
            if status == "reported":
                return self.get_report(task_id)
            elif status == "failed_analysis":
                return {"error": "Analysis failed"}
            time.sleep(poll_interval)
            elapsed += poll_interval
        return {"error": "Analysis timed out"}

    def get_report(self, task_id):
        """Retrieve analysis report"""
        response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
        report = response.json()

        # Extract key indicators
        return {
            "task_id": task_id,
            "score": report.get("info", {}).get("score", 0),
            "signatures": [
                {"name": s["name"], "severity": s["severity"], "description": s["description"]}
                for s in report.get("signatures", [])
            ],
            "network": {
                "dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
                "http": [
                    {"url": h["uri"], "method": h["method"]}
                    for h in report.get("network", {}).get("http", [])
                ],
                "hosts": report.get("network", {}).get("hosts", [])
            },
            "dropped_files": [
                {"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
                for f in report.get("dropped", [])
            ],
            "processes": [
                {"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
                for p in report.get("behavior", {}).get("processes", [])
            ],
            "registry_keys": [
                k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
            ]
        }

    def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
        """Submit to Joe Sandbox Cloud"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{joe_url}/v2/submission/new",
                headers={"Authorization": f"Bearer {joe_api_key}"},
                files={"sample": f},
                data={
                    "systems": "w10_64",
                    "internet-access": False,
                    "report-cache": True
                }
            )
        return response.json()["data"]["webid"]

Step 4: Extract IOCs and Generate Verdict

class VerdictGenerator:
    def __init__(self):
        self.malicious_threshold = 7  # Cuckoo score threshold

    def generate_verdict(self, pre_screen, sandbox_report):
        """Combine pre-screening and sandbox results for final verdict"""
        iocs = {
            "ips": [],
            "domains": [],
            "urls": [],
            "hashes": [],
            "registry_keys": [],
            "files_dropped": []
        }

        # Extract IOCs from sandbox report
        if sandbox_report:
            iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
            iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
            iocs["urls"] = [
                h["url"] for h in sandbox_report.get("network", {}).get("http", [])
            ]
            iocs["hashes"] = [
                f["sha256"] for f in sandbox_report.get("dropped_files", [])
            ]
            iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
            iocs["files_dropped"] = sandbox_report.get("dropped_files", [])

        # Determine verdict
        vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
        sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
        sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0

        combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)

        if combined_score >= 100:
            verdict = "MALICIOUS"
            confidence = "HIGH"
        elif combined_score >= 50:
            verdict = "SUSPICIOUS"
            confidence = "MEDIUM"
        elif combined_score >= 20:
            verdict = "POTENTIALLY_UNWANTED"
            confidence = "LOW"
        else:
            verdict = "CLEAN"
            confidence = "HIGH"

        return {
            "verdict": verdict,
            "confidence": confidence,
            "combined_score": combined_score,
            "iocs": iocs,
            "vt_detections": vt_malicious,
            "sandbox_score": sandbox_score,
            "signatures": sandbox_report.get("signatures", []) if sandbox_report else []
        }

Step 5: Push Results to SIEM

def push_to_splunk(verdict_result, splunk_url, splunk_token):
    """Send malware analysis verdict to Splunk HEC"""
    import json

    event = {
        "sourcetype": "malware_analysis",
        "source": "malware_pipeline",
        "event": {
            "sha256": verdict_result["sha256"],
            "verdict": verdict_result["verdict"],
            "confidence": verdict_result["confidence"],
            "score": verdict_result["combined_score"],
            "vt_detections": verdict_result["vt_detections"],
            "sandbox_score": verdict_result["sandbox_score"],
            "malware_family": verdict_result.get("threat_label", "Unknown"),
            "iocs": verdict_result["iocs"],
            "signatures": [s["name"] for s in verdict_result["signatures"]]
        }
    }

    response = requests.post(
        f"{splunk_url}/services/collector/event",
        headers={
            "Authorization": f"Splunk {splunk_token}",
            "Content-Type": "application/json"
        },
        json=event,
        verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true",  # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
    )
    return response.status_code == 200

def push_iocs_to_blocklist(iocs, firewall_api):
    """Push extracted IOCs to blocking infrastructure"""
    for ip in iocs.get("ips", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
        )
    for domain in iocs.get("domains", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
        )

Step 6: Orchestrate the Full Pipeline

def run_malware_pipeline(sample_path, config):
    """Execute full malware analysis pipeline"""
    collector = MalwareCollector()
    screener = MalwarePreScreener(config["vt_key"])
    submitter = SandboxSubmitter(config["cuckoo_url"])
    generator = VerdictGenerator()

    # Step 1: Hash and pre-screen
    hashes = collector.compute_hashes(sample_path)
    pre_screen = screener.pre_screen(hashes["sha256"])

    # Step 2: Submit to sandbox if unknown
    sandbox_report = None
    if pre_screen["needs_sandbox"]:
        task_id = submitter.submit_to_cuckoo(sample_path)
        sandbox_report = submitter.wait_for_analysis(task_id)

    # Step 3: Generate verdict
    verdict = generator.generate_verdict(pre_screen, sandbox_report)
    verdict["sha256"] = hashes["sha256"]
    verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")

    # Step 4: Push to SIEM
    push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])

    # Step 5: Block if malicious
    if verdict["verdict"] == "MALICIOUS":
        push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])

    screener.close()
    return verdict

Key Concepts

| Term | Definition | |------|-----------| | Dynamic Analysis | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) | | Static Analysis | Examining malware without execution (hash lookup, string analysis, PE header inspection) | | Sandbox Evasion | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis | | IOC Extraction | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports | | Multi-AV Scanning | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection | | Verdict | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |

Tools & Systems

  • Cuckoo Sandbox: Open-source automated malware analysis platform with behavioral analysis and network capture
  • Joe Sandbox: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
  • Any.Run: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
  • VirusTotal: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
  • CAPE Sandbox: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping

Common Scenarios

  • Email Attachment Triage: Auto-submit quarantined email attachments, generate verdict in <5 minutes
  • EDR Quarantine Processing: Batch-process files quarantined by endpoint security for detailed analysis
  • Incident Investigation: Submit suspicious binaries found during IR for malware family identification and IOC extraction
  • Threat Intel Enrichment: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
  • Zero-Day Detection: Sandbox catches novel malware missed by signature-based AV through behavioral analysis

Output Format

MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample:       invoice_march.docx
SHA256:       a1b2c3d4e5f6a7b8...
File Type:    Microsoft Word Document (macro-enabled)

Pre-Screening:
  VirusTotal:    34/72 malicious (Emotet.Downloader)
  MalwareBazaar: Tags: emotet, macro, downloader

Sandbox Analysis (Cuckoo):
  Score:         9.2/10 (MALICIOUS)
  Signatures:
    - Macro executes PowerShell download cradle (severity: 8)
    - Process injection into explorer.exe (severity: 9)
    - Connects to known Emotet C2 server (severity: 9)

Extracted IOCs:
  C2 IPs:       185.234.218[.]50:8080, 45.77.123[.]45:443
  Domains:       update-service[.]evil[.]com
  Dropped Files: payload.dll (SHA256: b2c3d4e5...)
  Registry:      HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update

VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
  [DONE] IOCs pushed to Splunk threat intel
  [DONE] C2 IPs blocked on firewall
  [DONE] Domain sinkholed on DNS
  [DONE] Hash blocked on endpoint