Upstash Workflow Implementation Guide
This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
🎯 The Three Core Patterns
All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:
- 🔍 Dry-Run Mode - Get statistics without triggering actual execution
- 🌟 Fan-Out Pattern - Split large batches into smaller chunks for parallel processing
- 🎯 Single Task Execution - Each workflow execution processes ONE item only
These patterns ensure scalable, debuggable, and cost-efficient async workflows.
Table of Contents
Architecture Overview
Standard 3-Layer Pattern
All workflows follow a standard 3-layer architecture:
Layer 1: Entry Point (process-*)
├─ Validates prerequisites
├─ Calculates total items to process
├─ Filters existing items
├─ Supports dry-run mode (statistics only)
└─ Triggers Layer 2 if work needed
Layer 2: Pagination (paginate-*)
├─ Handles cursor-based pagination
├─ Implements fan-out for large batches
├─ Recursively processes all pages
└─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-*/generate-*)
└─ Performs actual business logic for ONE item
Examples: welcome-placeholder, agent-welcome
Core Patterns
1. Dry-Run Mode
Purpose: Get statistics without triggering actual execution
Pattern:
// Layer 1: Entry Point
if (dryRun) {
console.log('[workflow:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
Use Case: Check how many items will be processed before committing to execution
Response:
{
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true,
message: "[DryRun] Would process 80 items"
}
2. Fan-Out Pattern
Purpose: Split large batches into smaller chunks for parallel processing
Pattern:
// Layer 2: Pagination
const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) {
// Fan-out to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[workflow:paginate] Fan-out mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}
Use Case: Avoid hitting workflow step limits by splitting large batches
Configuration:
PAGE_SIZE = 50- Items per pagination pageCHUNK_SIZE = 20- Items per fan-out chunk- If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination
3. Single Task Execution
Purpose: Execute business logic for ONE item at a time
Pattern:
// Layer 3: Single Task Execution
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
// Get item
const item = await context.run('workflow:get-item', async () => {
return getItem(itemId);
});
// Execute business logic for THIS item only
const result = await context.run('workflow:execute', async () => {
return processItem(item);
});
// Save result for THIS item
await context.run('workflow:save', async () => {
return saveResult(itemId, result);
});
return { success: true, itemId, result };
},
{
flowControl: {
key: 'workflow.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);
Key Principles:
- Each workflow execution handles exactly ONE item
- Parallelism controlled by
flowControlconfig - Multiple items processed via Layer 2 triggering multiple Layer 3 executions
File Structure
Directory Layout
src/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # Layer 1
│ ├── paginate-{entities}/route.ts # Layer 2
│ └── execute-{entity}/route.ts # Layer 3
│
└── server/workflows/
└── {workflowName}/
└── index.ts # Workflow class
Cloud Project Configuration
For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:
Implementation Patterns
1. Workflow Class
Location: src/server/workflows/{workflowName}/index.ts
import { Client } from '@upstash/workflow';
import debug from 'debug';
const log = debug('lobe-server:workflows:{workflow-name}');
// Workflow paths
const WORKFLOW_PATHS = {
processItems: '/api/workflows/{workflow-name}/process-items',
paginateItems: '/api/workflows/{workflow-name}/paginate-items',
executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;
// Payload types
export interface ProcessItemsPayload {
dryRun?: boolean;
force?: boolean;
}
export interface PaginateItemsPayload {
cursor?: string;
itemIds?: string[]; // For fanout chunks
}
export interface ExecuteItemPayload {
itemId: string;
}
/**
* Get workflow URL using APP_URL
*/
const getWorkflowUrl = (path: string): string => {
const baseUrl = process.env.APP_URL;
if (!baseUrl) throw new Error('APP_URL is required to trigger workflows');
return new URL(path, baseUrl).toString();
};
/**
* Get workflow client
*/
const getWorkflowClient = (): Client => {
const token = process.env.QSTASH_TOKEN;
if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');
const config: ConstructorParameters<typeof Client>[0] = { token };
if (process.env.QSTASH_URL) {
(config as Record<string, unknown>).url = process.env.QSTASH_URL;
}
return new Client(config);
};
/**
* {Workflow Name} Workflow
*/
export class {WorkflowName}Workflow {
private static client: Client;
private static getClient(): Client {
if (!this.client) {
this.client = getWorkflowClient();
}
return this.client;
}
/**
* Trigger workflow to process items (entry point)
*/
static triggerProcessItems(payload: ProcessItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
log('Triggering process-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to paginate items
*/
static triggerPaginateItems(payload: PaginateItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
log('Triggering paginate-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to execute a single item
*/
static triggerExecuteItem(payload: ExecuteItemPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
log('Triggering execute-item workflow: %s', payload.itemId);
return this.getClient().trigger({ body: payload, url });
}
/**
* Filter items that need processing (e.g., check Redis cache, database state)
*/
static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
if (itemIds.length === 0) return [];
// Check existing state (Redis, database, etc.)
// Return items that need processing
return itemIds;
}
}
2. Layer 1: Entry Point (process-*)
Purpose: Validates prerequisites, calculates statistics, supports dryRun mode
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';
/**
* Entry workflow for {workflow description}
* 1. Get all eligible items
* 2. Filter items that already have results
* 3. If dryRun, return statistics only
* 4. If no items need processing, return early
* 5. Trigger paginate workflow
*/
export const { POST } = serve<ProcessPayload>(
async (context) => {
const { dryRun, force } = context.requestPayload ?? {};
console.log('[{workflow}:process] Starting with payload:', { dryRun, force });
// Get all eligible items
const allItemIds = await context.run('{workflow}:get-all-items', async () => {
const db = await getServerDB();
// Query database for eligible items
return items.map((item) => item.id);
});
console.log('[{workflow}:process] Total eligible items:', allItemIds.length);
if (allItemIds.length === 0) {
return {
success: true,
totalEligible: 0,
message: 'No eligible items found',
};
}
// Filter items that need processing
const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(allItemIds),
);
const result = {
success: true,
totalEligible: allItemIds.length,
toProcess: itemsNeedingProcessing.length,
alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
};
console.log('[{workflow}:process] Check result:', result);
// If dryRun mode, return statistics only
if (dryRun) {
console.log('[{workflow}:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
// If no items need processing, return early
if (itemsNeedingProcessing.length === 0) {
console.log('[{workflow}:process] All items already processed');
return {
...result,
message: 'All items already processed',
};
}
// Trigger paginate workflow
console.log('[{workflow}:process] Triggering paginate workflow');
await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));
return {
...result,
message: `Triggered pagination for ${itemsNeedingProcessing.length} items`,
};
},
{
flowControl: {
key: '{workflow}.process',
parallelism: 1,
ratePerSecond: 1,
},
},
);
3. Layer 2: Pagination (paginate-*)
Purpose: Handles cursor-based pagination, implements fanout for large batches
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';
const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
/**
* Paginate items workflow - handles pagination and fanout
* 1. If specific itemIds provided (from fanout), process them directly
* 2. Otherwise, paginate through all items using cursor
* 3. Filter items that need processing
* 4. If batch > CHUNK_SIZE, fanout to smaller chunks
* 5. Trigger execute workflow for each item
* 6. Schedule next page if cursor exists
*/
export const { POST } = serve<PaginatePayload>(
async (context) => {
const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};
console.log('[{workflow}:paginate] Starting with payload:', {
cursor,
itemIdsCount: payloadItemIds?.length ?? 0,
});
// If specific itemIds are provided, process them directly (from fanout)
if (payloadItemIds && payloadItemIds.length > 0) {
console.log('[{workflow}:paginate] Processing specific itemIds:', {
count: payloadItemIds.length,
});
await Promise.all(
payloadItemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
return {
success: true,
processedItems: payloadItemIds.length,
};
}
// Paginate through all items
const itemBatch = await context.run('{workflow}:get-batch', async () => {
const db = await getServerDB();
// Query database with cursor and PAGE_SIZE
const items = await db.query(...);
if (!items.length) return { ids: [] };
const last = items.at(-1);
return {
ids: items.map(item => item.id),
cursor: last ? last.id : undefined,
};
});
const batchItemIds = itemBatch.ids;
const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;
console.log('[{workflow}:paginate] Got batch:', {
batchSize: batchItemIds.length,
nextCursor,
});
if (batchItemIds.length === 0) {
console.log('[{workflow}:paginate] No more items, pagination complete');
return { success: true, message: 'Pagination complete' };
}
// Filter items that need processing
const itemIds = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
);
console.log('[{workflow}:paginate] After filtering:', {
needProcessing: itemIds.length,
skipped: batchItemIds.length - itemIds.length,
});
// Process items if any need processing
if (itemIds.length > 0) {
if (itemIds.length > CHUNK_SIZE) {
// Fanout to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[{workflow}:paginate] Fanout mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
} else {
// Process directly
console.log('[{workflow}:paginate] Processing items directly:', {
count: itemIds.length,
});
await Promise.all(
itemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
}
}
// Schedule next page
if (nextCursor) {
console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor });
await context.run('{workflow}:next-page', () =>
WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
);
} else {
console.log('[{workflow}:paginate] No more pages');
}
return {
success: true,
processedItems: itemIds.length,
skippedItems: batchItemIds.length - itemIds.length,
nextCursor: nextCursor ?? null,
};
},
{
flowControl: {
key: '{workflow}.paginate',
parallelism: 20,
ratePerSecond: 5,
},
},
);
4. Layer 3: Execution (execute-/generate-)
Purpose: Performs actual business logic
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';
/**
* Execute item workflow - performs actual business logic
* 1. Get item data
* 2. Perform business logic (AI generation, data processing, etc.)
* 3. Save results
*/
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
console.log('[{workflow}:execute] Starting:', { itemId });
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const db = await getServerDB();
// Get item data
const item = await context.run('{workflow}:get-item', async () => {
// Query database for item
return item;
});
if (!item) {
return { success: false, error: 'Item not found' };
}
// Perform business logic
const result = await context.run('{workflow}:process-item', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.generate(); // or process(), execute(), etc.
});
// Save results
await context.run('{workflow}:save-result', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.saveToRedis(result); // or saveToDatabase(), etc.
});
console.log('[{workflow}:execute] Completed:', { itemId });
return {
success: true,
itemId,
result,
};
},
{
flowControl: {
key: '{workflow}.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);
Best Practices
1. Error Handling
export const { POST } = serve<Payload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
// Validate required parameters
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
try {
// Perform work
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
},
{ flowControl: { ... } },
);
2. Logging
Use consistent log prefixes and structured logging:
console.log('[{workflow}:{layer}] Starting with payload:', payload);
console.log('[{workflow}:{layer}] Processing items:', { count: items.length });
console.log('[{workflow}:{layer}] Completed:', result);
console.error('[{workflow}:{layer}:error]', error);
3. Return Values
Return consistent response shapes:
// Success response
return {
success: true,
itemId,
result,
message: 'Optional success message',
};
// Error response
return {
success: false,
error: 'Error description',
itemId, // Include context if available
};
// Statistics response (for entry point)
return {
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true, // If applicable
message: 'Summary message',
};
4. flowControl Configuration
Purpose: Control concurrency and rate limiting for workflow executions
Tune concurrency based on layer:
// Layer 1: Entry point - single instance only
flowControl: {
key: '{workflow}.process',
parallelism: 1, // Only 1 process workflow at a time
ratePerSecond: 1, // 1 execution per second
}
// Layer 2: Pagination - moderate concurrency
flowControl: {
key: '{workflow}.paginate',
parallelism: 20, // Up to 20 pagination workflows in parallel
ratePerSecond: 5, // 5 new executions per second
}
// Layer 3: Single task execution - high concurrency
flowControl: {
key: '{workflow}.execute',
parallelism: 10, // Up to 10 items processed in parallel
ratePerSecond: 5, // 5 new items per second
}
Guidelines:
- Layer 1: Always use
parallelism: 1to avoid duplicate processing - Layer 2: Moderate concurrency for pagination (typically 10-20)
- Layer 3: Higher concurrency for parallel item processing (typically 5-10)
- Adjust
ratePerSecondbased on external API rate limits or resource constraints
5. context.run() Best Practices
- Use descriptive step names with prefixes:
{workflow}:step-name - Each step should be idempotent (safe to retry)
- Don't nest context.run() calls - keep them flat
- Use unique step names when processing multiple items:
// Good: Unique step names
await Promise.all(
items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);
// Bad: Same step name for all items
await Promise.all(
items.map((item) =>
context.run(`{workflow}:execute`, () =>
// ❌ Not unique
processItem(item),
),
),
);
6. Payload Validation
Always validate required parameters at the start:
export const { POST } = serve<Payload>(
async (context) => {
const { itemId, configId } = context.requestPayload ?? {};
// Validate at the start
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
if (!configId) {
return { success: false, error: 'Missing configId in payload' };
}
// Proceed with work...
},
{ flowControl: { ... } },
);
7. Database Connection
Get database connection once per workflow:
export const { POST } = serve<Payload>(
async (context) => {
const db = await getServerDB(); // Get once
// Use in multiple steps
const item = await context.run('get-item', async () => {
return itemModel.findById(db, itemId);
});
const result = await context.run('save-result', async () => {
return resultModel.create(db, result);
});
},
{ flowControl: { ... } },
);
8. Testing
Create integration tests for workflows:
describe('WorkflowName', () => {
it('should process items successfully', async () => {
// Setup test data
const items = await createTestItems();
// Trigger workflow
await WorkflowClass.triggerProcessItems({ dryRun: false });
// Wait for completion (use polling or webhook)
await waitForCompletion();
// Verify results
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('should support dryRun mode', async () => {
const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
});
});
Examples
Example 1: Welcome Placeholder
Use Case: Generate AI-powered welcome placeholders for users
Structure:
- Layer 1:
process-users- Entry point, checks eligible users - Layer 2:
paginate-users- Paginates through active users - Layer 3:
generate-user- Generates placeholders for ONE user
Core Patterns Demonstrated:
- Dry-Run Mode:
// Layer 1: process-users
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${usersNeedingGeneration.length} users`,
};
}
- Fan-Out Pattern:
// Layer 2: paginate-users
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
),
),
);
}
- Single Task Execution:
// Layer 3: generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
const { userId } = context.requestPayload ?? {};
// Execute for ONE user only
const workflow = new WelcomePlaceholderWorkflow(db, userId);
const placeholders = await context.run('generate', () => workflow.generate());
return { success: true, userId, placeholdersCount: placeholders.length };
});
Key Features:
- ✅ Filters users who already have cached placeholders in Redis
- ✅ Supports
paidOnlyflag to process only subscribed users - ✅ Supports
dryRunmode for statistics - ✅ Uses fan-out for large user batches (CHUNK_SIZE=20)
- ✅ Each execution processes exactly ONE user
Files:
/api/workflows/welcome-placeholder/process-users/route.ts/api/workflows/welcome-placeholder/paginate-users/route.ts/api/workflows/welcome-placeholder/generate-user/route.ts/server/workflows/welcomePlaceholder/index.ts
Example 2: Agent Welcome
Use Case: Generate welcome messages and open questions for AI agents
Structure:
- Layer 1:
process-agents- Entry point, checks eligible agents - Layer 2:
paginate-agents- Paginates through active agents - Layer 3:
generate-agent- Generates welcome data for ONE agent
Core Patterns Demonstrated:
- Dry-Run Mode:
// Layer 1: process-agents
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${agentsNeedingGeneration.length} agents`,
};
}
-
Fan-Out Pattern: Same as welcome-placeholder
-
Single Task Execution:
// Layer 3: generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
const { agentId } = context.requestPayload ?? {};
// Execute for ONE agent only
const workflow = new AgentWelcomeWorkflow(db, agentId);
const data = await context.run('generate', () => workflow.generate());
return { success: true, agentId, data };
});
Key Features:
- ✅ Filters agents who already have cached data in Redis
- ✅ Supports
paidOnlyflag for subscribed users' agents only - ✅ Supports
dryRunmode for statistics - ✅ Uses fan-out for large agent batches (CHUNK_SIZE=20)
- ✅ Each execution processes exactly ONE agent
Files:
/api/workflows/agent-welcome/process-agents/route.ts/api/workflows/agent-welcome/paginate-agents/route.ts/api/workflows/agent-welcome/generate-agent/route.ts/server/workflows/agentWelcome/index.ts
Key Takeaways from Examples
Both workflows follow the exact same pattern:
-
Layer 1 (Entry Point):
- Calculate statistics
- Filter existing items
- Support dry-run mode
- Trigger pagination only if needed
-
Layer 2 (Pagination):
- Paginate with cursor (PAGE_SIZE=50)
- Fan-out large batches (CHUNK_SIZE=20)
- Trigger Layer 3 for each item
- Recursively process all pages
-
Layer 3 (Execution):
- Process ONE item per execution
- Perform business logic
- Save results
- Return success/failure
The only differences are:
- Entity type (users vs agents)
- Business logic (placeholder generation vs welcome generation)
- Data source (different database queries)
Common Pitfalls
❌ Don't: Use context.run() without unique names
// Bad: Same step name when processing multiple items
await Promise.all(items.map((item) => context.run('process', () => process(item))));
// Good: Unique step names
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));
❌ Don't: Forget to validate payload parameters
// Bad: No validation
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
const result = await process(itemId); // May fail with undefined
});
// Good: Validate early
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const result = await process(itemId);
});
❌ Don't: Skip filtering existing items
// Bad: No filtering, may duplicate work
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));
// Good: Filter existing items first
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));
❌ Don't: Use inconsistent logging
// Bad: Inconsistent prefixes and formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);
// Good: Consistent structured logging
console.log('[workflow:layer] Starting with payload:', payload);
console.log('[workflow:layer] Processing item:', { itemId });
console.log('[workflow:layer] Completed:', { itemId, result });
Environment Variables Required
# Required for all workflows
APP_URL=https://your-app.com # Base URL for workflow endpoints
QSTASH_TOKEN=qstash_xxx # QStash authentication token
# Optional (for custom QStash URL)
QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint
Checklist for New Workflows
Planning Phase
- [ ] Identify entity to process (users, agents, items, etc.)
- [ ] Define business logic for single item execution
- [ ] Determine filtering logic (Redis cache, database state, etc.)
Implementation Phase
- [ ] Define payload types with proper TypeScript interfaces
- [ ] Create workflow class with static trigger methods
- [ ] Layer 1: Implement entry point with dry-run support
- [ ] Layer 1: Add filtering logic to avoid duplicate work
- [ ] Layer 2: Implement pagination with fan-out logic
- [ ] Layer 3: Implement single task execution (ONE item per run)
- [ ] Configure appropriate flowControl for each layer
- [ ] Add consistent logging with workflow prefixes
- [ ] Validate all required payload parameters
- [ ] Use unique context.run() step names
Quality & Deployment
- [ ] Return consistent response shapes
- [ ] Configure cloud deployment (see Cloud Guide if using lobehub-cloud)
- [ ] Write integration tests
- [ ] Test with dry-run mode first
- [ ] Test with small batch before full rollout