Kling AI Batch Processing
Overview
Generate multiple videos efficiently using controlled parallelism, rate-limit-aware submission, progress tracking, and result collection. All requests go through https://api.klingai.com/v1.
Batch Submission with Rate Limiting
import jwt, time, os, requests
BASE = "https://api.klingai.com/v1"
def get_headers():
ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
token = jwt.encode(
{"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
)
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def submit_batch(prompts, model="kling-v2-master", duration="5",
mode="standard", max_concurrent=3, delay=2.0):
"""Submit batch with controlled concurrency and pacing."""
tasks = []
active = []
for i, prompt in enumerate(prompts):
# Wait if at concurrency limit
while len(active) >= max_concurrent:
active = [t for t in active if not check_complete(t["task_id"])]
if len(active) >= max_concurrent:
time.sleep(5)
response = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
"model_name": model,
"prompt": prompt,
"duration": duration,
"mode": mode,
})
data = response.json()["data"]
task = {"task_id": data["task_id"], "prompt": prompt, "index": i}
tasks.append(task)
active.append(task)
print(f"[{i+1}/{len(prompts)}] Submitted: {data['task_id']}")
time.sleep(delay) # pace requests
return tasks
def check_complete(task_id):
r = requests.get(f"{BASE}/videos/text2video/{task_id}", headers=get_headers()).json()
return r["data"]["task_status"] in ("succeed", "failed")
Collect Results
def collect_results(tasks, timeout=600):
"""Wait for all tasks and collect results."""
results = {}
start = time.monotonic()
while len(results) < len(tasks) and time.monotonic() - start < timeout:
for task in tasks:
if task["task_id"] in results:
continue
r = requests.get(
f"{BASE}/videos/text2video/{task['task_id']}", headers=get_headers()
).json()
status = r["data"]["task_status"]
if status == "succeed":
results[task["task_id"]] = {
"status": "succeed",
"url": r["data"]["task_result"]["videos"][0]["url"],
"prompt": task["prompt"],
}
elif status == "failed":
results[task["task_id"]] = {
"status": "failed",
"error": r["data"].get("task_status_msg", "Unknown"),
"prompt": task["prompt"],
}
if len(results) < len(tasks):
time.sleep(15)
return results
Async Batch with asyncio
import asyncio
import aiohttp
async def async_batch(prompts, max_concurrent=3):
"""Async batch processing with semaphore-controlled concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
results = {}
async def generate_one(prompt, index):
async with semaphore:
async with aiohttp.ClientSession() as session:
# Submit
async with session.post(
f"{BASE}/videos/text2video",
headers=get_headers(),
json={"model_name": "kling-v2-master", "prompt": prompt,
"duration": "5", "mode": "standard"},
) as resp:
data = (await resp.json())["data"]
task_id = data["task_id"]
# Poll
while True:
await asyncio.sleep(10)
async with session.get(
f"{BASE}/videos/text2video/{task_id}",
headers=get_headers(),
) as resp:
data = (await resp.json())["data"]
if data["task_status"] == "succeed":
results[index] = data["task_result"]["videos"][0]["url"]
return
elif data["task_status"] == "failed":
results[index] = f"FAILED: {data.get('task_status_msg')}"
return
await asyncio.gather(*[generate_one(p, i) for i, p in enumerate(prompts)])
return results
Batch with Callbacks (No Polling)
def submit_batch_with_callbacks(prompts, callback_url):
"""Submit batch with webhook callbacks -- no polling needed."""
tasks = []
for prompt in prompts:
r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json={
"model_name": "kling-v2-master",
"prompt": prompt,
"duration": "5",
"mode": "standard",
"callback_url": callback_url,
}).json()
tasks.append(r["data"]["task_id"])
time.sleep(2) # rate limit pacing
return tasks
Cost Estimation Before Batch
def estimate_batch_cost(count, duration=5, mode="standard", audio=False):
credits_map = {(5, "standard"): 10, (5, "professional"): 35,
(10, "standard"): 20, (10, "professional"): 70}
per_video = credits_map.get((duration, mode), 10)
if audio:
per_video *= 5
total = count * per_video
print(f"Batch: {count} videos x {per_video} credits = {total} credits")
print(f"Estimated cost: ${total * 0.14:.2f}")
return total
# Check before submitting
needed = estimate_batch_cost(50, duration=5, mode="standard")