Kling AI Job Monitoring
Overview
Every Kling AI generation returns a task_id. This skill covers polling strategies, batch tracking, timeout handling, and callback-based monitoring for the /v1/videos/text2video, /v1/videos/image2video, and /v1/videos/video-extend endpoints.
Task Lifecycle
| Status | Meaning | Typical Duration |
|--------|---------|-----------------|
| submitted | Queued for processing | 0-30s |
| processing | Generation in progress | 30-120s (standard), 60-300s (professional) |
| succeed | Complete, video URL available | Terminal |
| failed | Generation failed | Terminal |
Polling a Single Task
import jwt, time, os, requests
BASE = "https://api.klingai.com/v1"
def get_headers():
ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
token = jwt.encode(
{"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
)
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def poll_task(endpoint: str, task_id: str, interval: int = 10, timeout: int = 600):
"""Poll with adaptive interval and timeout."""
start = time.monotonic()
attempts = 0
while time.monotonic() - start < timeout:
time.sleep(interval)
attempts += 1
r = requests.get(f"{BASE}{endpoint}/{task_id}", headers=get_headers(), timeout=30)
data = r.json()["data"]
status = data["task_status"]
elapsed = int(time.monotonic() - start)
print(f"[{elapsed}s] Poll #{attempts}: {status}")
if status == "succeed":
return data["task_result"]
elif status == "failed":
raise RuntimeError(f"Task failed: {data.get('task_status_msg', 'unknown')}")
if attempts > 5:
interval = min(interval * 1.2, 30)
raise TimeoutError(f"Task {task_id} timed out after {timeout}s")
Batch Job Tracker
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class TrackedTask:
task_id: str
endpoint: str
prompt: str
status: str = "submitted"
created_at: float = field(default_factory=time.time)
result_url: Optional[str] = None
error_msg: Optional[str] = None
class BatchTracker:
def __init__(self):
self.tasks: dict[str, TrackedTask] = {}
def add(self, task_id, endpoint, prompt):
self.tasks[task_id] = TrackedTask(task_id=task_id, endpoint=endpoint, prompt=prompt)
def update_all(self):
active = [t for t in self.tasks.values() if t.status in ("submitted", "processing")]
for task in active:
try:
r = requests.get(
f"{BASE}{task.endpoint}/{task.task_id}",
headers=get_headers(), timeout=30
).json()
data = r["data"]
task.status = data["task_status"]
if task.status == "succeed":
task.result_url = data["task_result"]["videos"][0]["url"]
elif task.status == "failed":
task.error_msg = data.get("task_status_msg")
except Exception as e:
print(f"Error polling {task.task_id}: {e}")
def print_report(self):
by_status = {}
for t in self.tasks.values():
by_status.setdefault(t.status, 0)
by_status[t.status] += 1
active = sum(v for k, v in by_status.items() if k in ("submitted", "processing"))
print(f"\n=== Batch: {len(self.tasks)} tasks, {active} active ===")
for status, count in sorted(by_status.items()):
print(f" {status}: {count}")
Stuck Task Detection
def detect_stuck(tracker: BatchTracker, threshold_sec: int = 600):
"""Flag tasks processing longer than threshold."""
now = time.time()
stuck = []
for t in tracker.tasks.values():
if t.status in ("submitted", "processing"):
elapsed = now - t.created_at
if elapsed > threshold_sec:
stuck.append((t.task_id, int(elapsed)))
if stuck:
print(f"WARNING: {len(stuck)} stuck tasks:")
for tid, secs in stuck:
print(f" {tid}: {secs}s")
return stuck
Batch Monitor Loop
tracker = BatchTracker()
# Submit batch
for prompt in prompts:
r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
"model_name": "kling-v2-master", "prompt": prompt, "duration": "5"
}).json()
tracker.add(r["data"]["task_id"], "/videos/text2video", prompt)
# Monitor until all complete
while any(t.status in ("submitted", "processing") for t in tracker.tasks.values()):
time.sleep(15)
tracker.update_all()
tracker.print_report()
detect_stuck(tracker)