Agent Skills: Kling AI Audit Logging

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/klingai-audit-logging

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/klingai-pack/skills/klingai-audit-logging

Skill Files

Browse the full folder contents for klingai-audit-logging.

Download Skill

Loading file tree…

plugins/saas-packs/klingai-pack/skills/klingai-audit-logging/SKILL.md

Skill Metadata

Name
klingai-audit-logging
Description
|

Kling AI Audit Logging

Overview

Compliance-grade audit logging for Kling AI API operations. Every task submission, status change, and credential usage is captured in tamper-evident structured logs.

Audit Event Schema

import json
import hashlib
import time
from datetime import datetime
from pathlib import Path

class AuditLogger:
    """Append-only audit log with integrity checksums."""

    def __init__(self, log_dir: str = "audit"):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(exist_ok=True)
        self._prev_hash = "genesis"

    def _compute_hash(self, entry: dict) -> str:
        raw = json.dumps(entry, sort_keys=True) + self._prev_hash
        return hashlib.sha256(raw.encode()).hexdigest()[:16]

    def log(self, event_type: str, actor: str, details: dict):
        """Write a tamper-evident audit entry."""
        entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "event_type": event_type,
            "actor": actor,
            "details": details,
            "prev_hash": self._prev_hash,
        }
        entry["hash"] = self._compute_hash(entry)
        self._prev_hash = entry["hash"]

        date = datetime.utcnow().strftime("%Y-%m-%d")
        filepath = self.log_dir / f"audit-{date}.jsonl"
        with open(filepath, "a") as f:
            f.write(json.dumps(entry) + "\n")

        return entry["hash"]

Audit Events for Kling AI

class KlingAuditClient:
    """Kling client with full audit trail."""

    def __init__(self, base_client, audit: AuditLogger, actor: str = "system"):
        self.client = base_client
        self.audit = audit
        self.actor = actor

    def text_to_video(self, prompt: str, **kwargs):
        # Log submission
        self.audit.log("task_submitted", self.actor, {
            "action": "text_to_video",
            "model": kwargs.get("model", "kling-v2-master"),
            "duration": kwargs.get("duration", 5),
            "mode": kwargs.get("mode", "standard"),
            "prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:16],
            "prompt_length": len(prompt),
        })

        result = self.client.text_to_video(prompt, **kwargs)

        # Log completion
        self.audit.log("task_completed", self.actor, {
            "action": "text_to_video",
            "status": "succeed",
            "video_count": len(result.get("videos", [])),
        })

        return result

    def log_auth_event(self, event: str, success: bool):
        self.audit.log("auth_event", self.actor, {
            "event": event,
            "success": success,
            "access_key_prefix": self.client.config.access_key[:8] + "...",
        })

Audit Log Verification

def verify_audit_chain(log_file: str) -> bool:
    """Verify tamper-evidence of audit log chain."""
    prev_hash = "genesis"
    entries = []

    with open(log_file) as f:
        for line_num, line in enumerate(f, 1):
            entry = json.loads(line)
            entries.append(entry)

            if entry["prev_hash"] != prev_hash:
                print(f"Chain broken at line {line_num}: "
                      f"expected prev_hash={prev_hash}, got {entry['prev_hash']}")
                return False

            # Recompute hash
            check_entry = {k: v for k, v in entry.items() if k != "hash"}
            raw = json.dumps(check_entry, sort_keys=True) + prev_hash
            expected_hash = hashlib.sha256(raw.encode()).hexdigest()[:16]

            if entry["hash"] != expected_hash:
                print(f"Hash mismatch at line {line_num}")
                return False

            prev_hash = entry["hash"]

    print(f"Verified {len(entries)} entries -- chain intact")
    return True

Audit Report Generator

def generate_audit_report(log_dir: str = "audit", days: int = 30) -> dict:
    """Generate compliance audit report."""
    from collections import Counter
    from datetime import timedelta

    log_path = Path(log_dir)
    events = []

    cutoff = datetime.utcnow() - timedelta(days=days)
    for filepath in sorted(log_path.glob("audit-*.jsonl")):
        with open(filepath) as f:
            for line in f:
                entry = json.loads(line)
                if entry["timestamp"] >= cutoff.isoformat():
                    events.append(entry)

    event_types = Counter(e["event_type"] for e in events)
    actors = Counter(e["actor"] for e in events)

    report = {
        "period_days": days,
        "total_events": len(events),
        "event_types": dict(event_types),
        "unique_actors": len(actors),
        "actors": dict(actors),
        "first_event": events[0]["timestamp"] if events else None,
        "last_event": events[-1]["timestamp"] if events else None,
    }

    print(f"\n=== Audit Report ({days} days) ===")
    print(f"Total events: {report['total_events']}")
    for event_type, count in event_types.most_common():
        print(f"  {event_type}: {count}")
    print(f"Actors: {', '.join(actors.keys())}")

    return report

Compliance Checklist

  • [ ] All API calls logged with timestamp, actor, action
  • [ ] Prompts stored as hashes (not plaintext) for privacy
  • [ ] Audit chain integrity verifiable
  • [ ] Logs retained for required period (typically 1-7 years)
  • [ ] Log access restricted to authorized personnel
  • [ ] Regular verification of chain integrity

Resources