π FLOW CONTROL MODULES IMPLEMENTATION
Overviewβ
Added LOOPER and MULTI-THREADER flow control modules to ValkyrAI Workflow Studio! These powerful nodes enable advanced workflow orchestration with loop control and parallel execution.
π Looper Moduleβ
Purposeβ
Control flow iteration with multiple loop types, break/continue conditions, and progress tracking.
Loop Typesβ
1. FOR Loopβ
{
loopType: 'FOR',
iterations: 10,
startIndex: 0,
breakCondition: 'result.error !== null',
continueCondition: 'result.status === "skip"'
}
- Use case: Execute a task N times
- Example: Process batch of 100 records, 10 at a time
2. WHILE Loopβ
{
loopType: 'WHILE',
condition: 'count < 100',
maxIterations: 1000 // Safety limit
}
- Use case: Continue until condition becomes false
- Example: Poll API until data is ready
3. FOR-EACH Loopβ
{
loopType: 'FOR_EACH',
collection: 'orderItems',
itemVariable: 'item',
breakCondition: 'item.price > 1000'
}
- Use case: Iterate over arrays/collections
- Example: Process each order item
4. DO-WHILE Loopβ
{
loopType: 'DO_WHILE',
condition: 'response.hasMore === true',
maxIterations: 50
}
- Use case: Execute at least once, then check condition
- Example: Fetch paginated data
Featuresβ
β
Progress Tracking - Real-time iteration counter
β
Break Conditions - Exit loop early based on state
β
Continue Conditions - Skip to next iteration
β
Safety Limits - maxIterations prevents infinite loops
β
Nested Loops - Loop nodes can contain other loops
β
Visual Feedback - Animated progress bar during execution
Connection Pointsβ
- Input (Left): Entry point to loop
- Loop Body (Bottom): Tasks to execute each iteration
- Exit (Right): Continue after loop completes
Configuration UIβ
interface LooperNodeData {
label: string;
loopType: "FOR" | "WHILE" | "FOR_EACH" | "DO_WHILE";
iterations?: number;
startIndex?: number;
condition?: string;
collection?: string;
itemVariable?: string;
breakCondition?: string;
continueCondition?: string;
maxIterations?: number;
currentIteration?: number;
totalIterations?: number;
isRunning?: boolean;
}
β‘ Multi-Threader Moduleβ
Purposeβ
Parallel execution control for fan-out/fan-in patterns, racing, and thread pool management.
Modesβ
1. FAN_OUT Modeβ
{
mode: 'FAN_OUT',
maxThreads: 10,
timeout: 30000 // 30 seconds
}
- Use case: Split work into parallel branches
- Example: Send email, SMS, and push notification simultaneously
- Handles: Single input β Multiple outputs (3 default)
2. FAN_IN Modeβ
{
mode: 'FAN_IN',
joinStrategy: 'WAIT_ALL',
timeout: 60000
}
- Use case: Wait for parallel branches to complete
- Example: Collect results from multiple API calls
- Handles: Multiple inputs (3 default) β Single output
3. RACE Modeβ
{
mode: 'RACE',
timeout: 5000,
retryOnFailure: true,
maxRetries: 3
}
- Use case: First thread to complete wins
- Example: Try multiple mirrors, use fastest response
- Handles: Single input β Single output
4. ALL Modeβ
{
mode: 'ALL',
timeout: 120000,
retryOnFailure: true
}
- Use case: All threads must complete successfully
- Example: Parallel database writes that must all succeed
- Handles: Single input β Single output
Join Strategies (FAN_IN)β
- WAIT_ALL: All inputs must complete
- WAIT_ANY: First input to complete
- WAIT_N: Specific number must complete (set
requiredCount)
Featuresβ
β
Thread Pool Management - maxThreads limits concurrent execution
β
Timeout Handling - Kill threads exceeding timeout
β
Retry Logic - Auto-retry failed threads
β
Status Visualization - Real-time thread status indicators
β
Error Aggregation - Collect errors from all threads
β
Thread Tracking - Individual thread status (pending/running/completed/failed/timeout)
Thread Statusβ
interface ThreadStatus {
id: string;
label: string;
status: "pending" | "running" | "completed" | "failed" | "timeout";
startTime?: number;
endTime?: number;
result?: any;
error?: string;
}
Configuration UIβ
interface MultiThreaderNodeData {
label: string;
mode: "FAN_OUT" | "FAN_IN" | "RACE" | "ALL";
maxThreads?: number;
queueSize?: number;
timeout?: number;
retryOnFailure?: boolean;
maxRetries?: number;
joinStrategy?: "WAIT_ALL" | "WAIT_ANY" | "WAIT_N";
requiredCount?: number;
threads?: ThreadStatus[];
isRunning?: boolean;
completedCount?: number;
failedCount?: number;
}
π¨ Visual Designβ
Looper Nodeβ
- Color: Amber (
#f59e0b) - Icon: π (rotating arrows)
- Style: Orange gradient with pulse animation when running
- Progress Bar: Shows
currentIteration / totalIterations - Badges: Break/Continue/Limit conditions
Multi-Threader Nodeβ
- Color: Purple (
#8b5cf6) - Icon: β‘ (lightning bolt)
- Style: Purple gradient with glow animation when running
- Thread Indicators: Color-coded status bars for each thread
- π¦ Blue = Running (blinking)
- π© Green = Completed
- π₯ Red = Failed
- π§ Orange = Timeout
- β« Gray = Pending
- Stats: β Completed / β‘ Running / β Failed
π¦ Files Createdβ
TypeScript Componentsβ
/components/WorkflowStudio/nodes/
βββ LooperNode.tsx (281 lines) - Loop control component
βββ LooperNode.css (387 lines) - Loop styling with animations
βββ MultiThreaderNode.tsx (320 lines) - Multi-thread control component
βββ MultiThreaderNode.css (336 lines) - Multi-thread styling with indicators
Integration Pointsβ
/components/WorkflowStudio/
βββ index.tsx - Added looper/multithreader to palette
βββ WorkflowCanvas.tsx - Registered node types, added node components
π Usage Examplesβ
Example 1: Batch Processing with Loopβ
workflow:
- node: Looper
type: FOR_EACH
collection: orders
itemVariable: order
maxIterations: 1000
body:
- node: Process Order Task
input: "${order}"
- node: Send Confirmation
input: "${order.email}"
Example 2: Parallel API Callsβ
workflow:
- node: Multi-Thread Fan-Out
type: FAN_OUT
maxThreads: 5
outputs:
- branch: Fetch User Data
- branch: Fetch Order History
- branch: Fetch Preferences
- node: Multi-Thread Fan-In
type: FAN_IN
joinStrategy: WAIT_ALL
- node: Aggregate Results
Example 3: Retry with Race Conditionβ
workflow:
- node: Multi-Thread Race
type: RACE
timeout: 5000
retryOnFailure: true
maxRetries: 3
threads:
- Try API Mirror 1
- Try API Mirror 2
- Try API Mirror 3
Example 4: Polling Loopβ
workflow:
- node: Poll Until Ready
type: WHILE
condition: 'status !== "ready"'
maxIterations: 100
body:
- node: Check Status API
- node: Wait 5 Seconds
π§ Backend Integrationβ
Required Backend Methodsβ
Looper Executionβ
// ValkyrWorkflowService.java
public Map<String, Object> executeLoop(
Task loopTask,
Map<String, Object> loopConfig,
Map<String, Object> state
) {
String loopType = (String) loopConfig.get("loopType");
switch (loopType) {
case "FOR":
return executeForLoop(loopTask, loopConfig, state);
case "WHILE":
return executeWhileLoop(loopTask, loopConfig, state);
case "FOR_EACH":
return executeForEachLoop(loopTask, loopConfig, state);
case "DO_WHILE":
return executeDoWhileLoop(loopTask, loopConfig, state);
}
}
Multi-Thread Executionβ
// ValkyrWorkflowService.java
public Map<String, Object> executeMultiThread(
Task threadTask,
Map<String, Object> threadConfig,
Map<String, Object> state
) {
String mode = (String) threadConfig.get("mode");
Integer maxThreads = (Integer) threadConfig.getOrDefault("maxThreads", 10);
ExecutorService executor = Executors.newFixedThreadPool(maxThreads);
switch (mode) {
case "FAN_OUT":
return executeFanOut(executor, threadTask, threadConfig, state);
case "FAN_IN":
return executeFanIn(executor, threadTask, threadConfig, state);
case "RACE":
return executeRace(executor, threadTask, threadConfig, state);
case "ALL":
return executeAll(executor, threadTask, threadConfig, state);
}
}
π― Next Stepsβ
Phase 1: Basic Implementation (COMPLETE β )β
- Create LooperNode component
- Create MultiThreaderNode component
- Add to WorkflowStudio palette
- Register with ReactFlow
- Style with LCARS theme
Phase 2: Backend Integration (TODO)β
- Implement
executeLoop()in ValkyrWorkflowService - Implement
executeMultiThread()in ValkyrWorkflowService - Add loop state tracking in WorkflowState
- Add thread status tracking
- Implement break/continue logic
- Add safety limits (max iterations, timeouts)
Phase 3: Advanced Features (TODO)β
- Nested loop support
- Thread priority queuing
- Dynamic thread pool sizing
- Rate limiting for loops
- Dead-lock detection
- Performance metrics (avg iteration time, thread utilization)
Phase 4: UI Enhancements (TODO)β
- Loop/thread configuration modals
- Real-time status updates via WebSocket
- Historical execution metrics
- Error detail popover
- Export loop/thread logs
π Known Issuesβ
- Linting Warnings: Missing braces after if statements (style warnings, non-breaking)
- Backend Not Implemented: Nodes are UI-only until backend methods added
- WebSocket Integration: Real-time status updates need WebSocket connection
π Documentationβ
See also:
WORKFLOW_STUDIO_ARCHITECTURE.md- Overall architectureWORKFLOW_STUDIO_UX_OVERHAUL.md- UX implementation details- Backend:
valkyrai/src/main/java/com/valkyrlabs/workflow/ValkyrWorkflowService.java
π Summaryβ
LOOPER and MULTI-THREADER modules are now available in the Workflow Studio palette!
These flow control nodes enable:
- β Complex iteration patterns (FOR, WHILE, FOR-EACH, DO-WHILE)
- β Parallel execution strategies (FAN-OUT, FAN-IN, RACE, ALL)
- β Thread pool management with safety limits
- β Real-time status visualization
- β Break/continue/timeout control
Ready to test: Drag from palette β Drop on canvas β Configure β Connect! π
Backend integration required for execution - nodes currently provide UI framework and state tracking.