Quick Reference
| Optimization | Technique | Impact |
|--------------|-----------|--------|
| Parallel requests | Promise.all() with batches | 5-10x throughput |
| Avoid polling | Use webhooks | Lower API calls |
| Cache by seed | Store prompt+seed results | Avoid regeneration |
| Right-size images | Use needed resolution | Lower cost |
| Fewer steps | Reduce inference steps | Faster, cheaper |
| Model Tier | Development | Production | |------------|-------------|------------| | Image | FLUX Schnell | FLUX.2 Pro | | Video | Runway Turbo | Kling 2.6 Pro |
| Serverless Config | Cost-Optimized | Latency-Optimized |
|-------------------|----------------|-------------------|
| min_concurrency | 0 | 1+ |
| keep_alive | 120 | 600+ |
| machine_type | Smallest viable | Higher tier |
When to Use This Skill
Use for performance and cost optimization:
- Reducing generation latency
- Lowering API costs
- Implementing parallel processing
- Choosing between polling and webhooks
- Configuring serverless scaling
Related skills:
- For API patterns: see
fal-api-reference - For model selection: see
fal-model-guide - For serverless config: see
fal-serverless-guide
fal.ai Performance and Cost Optimization
Strategies for optimizing performance, reducing costs, and scaling fal.ai integrations.
Performance Optimization
Client-Side Optimizations
1. Use Queue-Based Execution
Always prefer subscribe() over run() for generation tasks:
// Recommended: Queue-based with progress tracking
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
logs: true,
onQueueUpdate: (update) => {
// Show progress to users
if (update.status === "IN_PROGRESS") {
console.log("Generating...");
}
}
});
// Only use run() for fast endpoints (< 30s)
const quickResult = await fal.run("fal-ai/fast-sdxl", {
input: { prompt: "quick test" }
});
2. Parallel Requests
Process multiple requests concurrently:
// JavaScript - Parallel execution
async function generateBatch(prompts: string[]) {
const results = await Promise.all(
prompts.map(prompt =>
fal.subscribe("fal-ai/flux/dev", {
input: { prompt }
})
)
);
return results;
}
// With rate limiting
async function generateBatchWithLimit(prompts: string[], limit = 5) {
const results = [];
for (let i = 0; i < prompts.length; i += limit) {
const batch = prompts.slice(i, i + limit);
const batchResults = await Promise.all(
batch.map(prompt =>
fal.subscribe("fal-ai/flux/dev", { input: { prompt } })
)
);
results.push(...batchResults);
// Small delay between batches
if (i + limit < prompts.length) {
await new Promise(r => setTimeout(r, 100));
}
}
return results;
}
# Python - Async parallel
import asyncio
import fal_client
async def generate_batch(prompts: list[str]) -> list[dict]:
tasks = [
fal_client.run_async("fal-ai/flux/dev", arguments={"prompt": p})
for p in prompts
]
return await asyncio.gather(*tasks)
# With semaphore for rate limiting
async def generate_batch_limited(prompts: list[str], limit: int = 5):
semaphore = asyncio.Semaphore(limit)
async def generate_one(prompt: str):
async with semaphore:
return await fal_client.run_async(
"fal-ai/flux/dev",
arguments={"prompt": prompt}
)
return await asyncio.gather(*[generate_one(p) for p in prompts])
3. Streaming for Real-Time Feedback
Use streaming for progressive output:
// Show incremental progress
const stream = await fal.stream("fal-ai/flux/dev", {
input: { prompt: "A landscape" }
});
for await (const event of stream) {
updateProgressUI(event);
}
const result = await stream.done();
4. WebSockets for Interactive Apps
For real-time applications with continuous input:
const connection = fal.realtime.connect("fal-ai/lcm-sd15-i2i", {
connectionKey: `user-${userId}`,
throttleInterval: 128, // Debounce rapid inputs
onResult: (result) => {
displayImage(result.images[0].url);
}
});
// Send updates as user types/draws
inputElement.addEventListener('input', (e) => {
connection.send({
prompt: e.target.value,
image_url: currentImage
});
});
Server-Side Optimizations (Serverless)
1. Efficient Model Loading
class OptimizedApp(fal.App):
machine_type = "GPU-A100"
requirements = ["torch", "transformers", "accelerate"]
volumes = {
"/data": fal.Volume("model-cache")
}
def setup(self):
import torch
from transformers import AutoModelForCausalLM
# Use fp16 for faster inference and less memory
self.model = AutoModelForCausalLM.from_pretrained(
"model-name",
torch_dtype=torch.float16,
device_map="auto",
cache_dir="/data/models" # Persistent cache
)
# Enable optimizations
if hasattr(self.model, 'enable_attention_slicing'):
self.model.enable_attention_slicing()
2. Reduce Cold Starts
class WarmApp(fal.App):
machine_type = "GPU-A100"
keep_alive = 600 # 10 minutes warm
min_concurrency = 1 # Always keep one ready
# Use lightweight health check
@fal.endpoint("/health")
def health(self):
return {"status": "ok"}
3. Memory Management
class MemoryEfficientApp(fal.App):
def setup(self):
import torch
# Use mixed precision
self.model = load_model(torch_dtype=torch.float16)
# Enable memory-efficient attention (if using transformers)
self.model.enable_xformers_memory_efficient_attention()
def teardown(self):
# Clean up GPU memory
import torch
if hasattr(self, 'model'):
del self.model
torch.cuda.empty_cache()
@fal.endpoint("/generate")
def generate(self, request):
import torch
with torch.inference_mode(): # Disable gradient tracking
result = self.model(request.input)
return result
Cost Optimization
1. Choose the Right Model
| Need | Cheaper Option | Premium Option | |------|---------------|----------------| | Quick iteration | FLUX Schnell ($) | FLUX.1 Dev ($$) | | Production | FLUX.1 Dev ($$) | FLUX.2 Pro ($$$) | | Video preview | Runway Turbo ($$) | Kling Pro ($$$) |
// Development: Use fast/cheap models
const preview = await fal.subscribe("fal-ai/flux/schnell", {
input: { prompt: "test", num_inference_steps: 4 }
});
// Production: Use quality models
const final = await fal.subscribe("fal-ai/flux-2-pro", {
input: { prompt: "test" }
});
2. Optimize Image Sizes
Generate at the size you need, not larger:
// Don't generate larger than needed
const result = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
// Use preset sizes
image_size: "square_hd", // 1024x1024
// Or specific dimensions
image_size: { width: 800, height: 600 }
}
});
3. Reduce Inference Steps
Find the minimum steps for acceptable quality:
// Quick previews: fewer steps
const preview = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
num_inference_steps: 15 // Faster, slightly lower quality
}
});
// Final render: more steps
const final = await fal.subscribe("fal-ai/flux/dev", {
input: {
prompt: "test",
num_inference_steps: 28 // Default, high quality
}
});
4. Use Webhooks for High Volume
Avoid polling overhead with webhooks:
// Instead of polling
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
pollInterval: 1000 // Polling = more API calls
});
// Use webhooks
const { request_id } = await fal.queue.submit("fal-ai/flux/dev", {
input: { prompt: "test" },
webhookUrl: "https://your-server.com/webhook"
});
// No polling needed - result delivered to webhook
5. Cache Results
Use seeds for reproducible outputs:
// Cache key based on prompt + seed
const cacheKey = `${prompt}-${seed}`;
const cached = await cache.get(cacheKey);
if (cached) {
return cached;
}
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt, seed }
});
await cache.set(cacheKey, result);
return result;
6. Serverless Cost Optimization
class CostOptimizedApp(fal.App):
machine_type = "GPU-A10G" # Cheaper than A100 if sufficient
min_concurrency = 0 # Scale to zero when not used
keep_alive = 120 # Shorter keep-alive
# Use appropriate GPU for model size
# T4: < 16GB VRAM models
# A10G: 16-24GB VRAM models
# A100: 24-80GB VRAM models
Scaling Strategies
1. Horizontal Scaling
class ScalableApp(fal.App):
machine_type = "GPU-A100"
min_concurrency = 2 # Always have 2 instances
max_concurrency = 20 # Scale up to 20
# fal handles auto-scaling based on queue depth
2. Request Batching
class BatchApp(fal.App):
@fal.endpoint("/batch")
def batch_generate(self, prompts: list[str]) -> list[dict]:
# Process multiple prompts in one request
results = []
for prompt in prompts:
result = self.model(prompt)
results.append(result)
return results
3. Priority Queues
Use different endpoints for different priorities:
class PriorityApp(fal.App):
machine_type = "GPU-A100"
@fal.endpoint("/high-priority")
def high_priority(self, request):
# Separate endpoint for important requests
return self.process(request)
@fal.endpoint("/standard")
def standard(self, request):
# Standard processing
return self.process(request)
Monitoring and Debugging
1. Add Logging
import logging
class MonitoredApp(fal.App):
def setup(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
self.logger.info("App starting up")
# Load model
self.logger.info("Model loaded successfully")
@fal.endpoint("/generate")
def generate(self, request):
import time
start = time.time()
result = self.process(request)
elapsed = time.time() - start
self.logger.info(f"Request processed in {elapsed:.2f}s")
return result
2. Track Metrics
// Client-side timing
const start = Date.now();
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" },
onQueueUpdate: (update) => {
if (update.status === "IN_QUEUE") {
console.log(`Queue position: ${update.queue_position}`);
}
}
});
const elapsed = Date.now() - start;
console.log(`Total time: ${elapsed}ms`);
// Track in your analytics
analytics.track("fal_generation", {
model: "flux/dev",
elapsed_ms: elapsed,
queue_time_ms: result.timings?.queue,
inference_time_ms: result.timings?.inference
});
3. Error Monitoring
try {
const result = await fal.subscribe("fal-ai/flux/dev", {
input: { prompt: "test" }
});
} catch (error) {
// Log to error tracking service
errorTracker.captureException(error, {
tags: {
model: "flux/dev",
type: error.constructor.name
},
extra: {
status: error.status,
body: error.body
}
});
// Handle gracefully
return fallbackResult();
}
Checklist
Before Production
- [ ] Using queue-based execution (
subscribe) - [ ] Appropriate model selected for use case
- [ ] Image sizes optimized
- [ ] Error handling implemented
- [ ] Rate limiting in place
- [ ] Caching strategy defined
Serverless Deployment
- [ ] Correct machine type for model size
- [ ] Models loaded in
setup(), not per-request - [ ] Persistent volumes for large models
- [ ] Secrets properly configured
- [ ] Health check endpoint
- [ ] Logging enabled
Cost Management
- [ ] Scale-to-zero enabled (
min_concurrency = 0) - [ ] Appropriate
keep_alivesetting - [ ] Using cheaper models for development
- [ ] Batch processing where possible
- [ ] Webhook callbacks instead of polling
Monitoring
- [ ] Latency tracking
- [ ] Error rate monitoring
- [ ] Cost tracking
- [ ] Queue depth alerts