Skip to main content

πŸ” FLOW CONTROL MODULES IMPLEMENTATION

Overview​

Added LOOPER and MULTI-THREADER flow control modules to ValkyrAI Workflow Studio! These powerful nodes enable advanced workflow orchestration with loop control and parallel execution.

πŸ” Looper Module​

Purpose​

Control flow iteration with multiple loop types, break/continue conditions, and progress tracking.

Loop Types​

1. FOR Loop​

{
loopType: 'FOR',
iterations: 10,
startIndex: 0,
breakCondition: 'result.error !== null',
continueCondition: 'result.status === "skip"'
}
  • Use case: Execute a task N times
  • Example: Process batch of 100 records, 10 at a time

2. WHILE Loop​

{
loopType: 'WHILE',
condition: 'count < 100',
maxIterations: 1000 // Safety limit
}
  • Use case: Continue until condition becomes false
  • Example: Poll API until data is ready

3. FOR-EACH Loop​

{
loopType: 'FOR_EACH',
collection: 'orderItems',
itemVariable: 'item',
breakCondition: 'item.price > 1000'
}
  • Use case: Iterate over arrays/collections
  • Example: Process each order item

4. DO-WHILE Loop​

{
loopType: 'DO_WHILE',
condition: 'response.hasMore === true',
maxIterations: 50
}
  • Use case: Execute at least once, then check condition
  • Example: Fetch paginated data

Features​

βœ… Progress Tracking - Real-time iteration counter
βœ… Break Conditions - Exit loop early based on state
βœ… Continue Conditions - Skip to next iteration
βœ… Safety Limits - maxIterations prevents infinite loops
βœ… Nested Loops - Loop nodes can contain other loops
βœ… Visual Feedback - Animated progress bar during execution

Connection Points​

  • Input (Left): Entry point to loop
  • Loop Body (Bottom): Tasks to execute each iteration
  • Exit (Right): Continue after loop completes

Configuration UI​

interface LooperNodeData {
label: string;
loopType: "FOR" | "WHILE" | "FOR_EACH" | "DO_WHILE";
iterations?: number;
startIndex?: number;
condition?: string;
collection?: string;
itemVariable?: string;
breakCondition?: string;
continueCondition?: string;
maxIterations?: number;
currentIteration?: number;
totalIterations?: number;
isRunning?: boolean;
}

⚑ Multi-Threader Module​

Purpose​

Parallel execution control for fan-out/fan-in patterns, racing, and thread pool management.

Modes​

1. FAN_OUT Mode​

{
mode: 'FAN_OUT',
maxThreads: 10,
timeout: 30000 // 30 seconds
}
  • Use case: Split work into parallel branches
  • Example: Send email, SMS, and push notification simultaneously
  • Handles: Single input β†’ Multiple outputs (3 default)

2. FAN_IN Mode​

{
mode: 'FAN_IN',
joinStrategy: 'WAIT_ALL',
timeout: 60000
}
  • Use case: Wait for parallel branches to complete
  • Example: Collect results from multiple API calls
  • Handles: Multiple inputs (3 default) β†’ Single output

3. RACE Mode​

{
mode: 'RACE',
timeout: 5000,
retryOnFailure: true,
maxRetries: 3
}
  • Use case: First thread to complete wins
  • Example: Try multiple mirrors, use fastest response
  • Handles: Single input β†’ Single output

4. ALL Mode​

{
mode: 'ALL',
timeout: 120000,
retryOnFailure: true
}
  • Use case: All threads must complete successfully
  • Example: Parallel database writes that must all succeed
  • Handles: Single input β†’ Single output

Join Strategies (FAN_IN)​

  • WAIT_ALL: All inputs must complete
  • WAIT_ANY: First input to complete
  • WAIT_N: Specific number must complete (set requiredCount)

Features​

βœ… Thread Pool Management - maxThreads limits concurrent execution
βœ… Timeout Handling - Kill threads exceeding timeout
βœ… Retry Logic - Auto-retry failed threads
βœ… Status Visualization - Real-time thread status indicators
βœ… Error Aggregation - Collect errors from all threads
βœ… Thread Tracking - Individual thread status (pending/running/completed/failed/timeout)

Thread Status​

interface ThreadStatus {
id: string;
label: string;
status: "pending" | "running" | "completed" | "failed" | "timeout";
startTime?: number;
endTime?: number;
result?: any;
error?: string;
}

Configuration UI​

interface MultiThreaderNodeData {
label: string;
mode: "FAN_OUT" | "FAN_IN" | "RACE" | "ALL";
maxThreads?: number;
queueSize?: number;
timeout?: number;
retryOnFailure?: boolean;
maxRetries?: number;
joinStrategy?: "WAIT_ALL" | "WAIT_ANY" | "WAIT_N";
requiredCount?: number;
threads?: ThreadStatus[];
isRunning?: boolean;
completedCount?: number;
failedCount?: number;
}

🎨 Visual Design​

Looper Node​

  • Color: Amber (#f59e0b)
  • Icon: πŸ” (rotating arrows)
  • Style: Orange gradient with pulse animation when running
  • Progress Bar: Shows currentIteration / totalIterations
  • Badges: Break/Continue/Limit conditions

Multi-Threader Node​

  • Color: Purple (#8b5cf6)
  • Icon: ⚑ (lightning bolt)
  • Style: Purple gradient with glow animation when running
  • Thread Indicators: Color-coded status bars for each thread
    • 🟦 Blue = Running (blinking)
    • 🟩 Green = Completed
    • πŸŸ₯ Red = Failed
    • 🟧 Orange = Timeout
    • ⚫ Gray = Pending
  • Stats: βœ“ Completed / ⚑ Running / βœ— Failed

πŸ“¦ Files Created​

TypeScript Components​

/components/WorkflowStudio/nodes/
β”œβ”€β”€ LooperNode.tsx (281 lines) - Loop control component
β”œβ”€β”€ LooperNode.css (387 lines) - Loop styling with animations
β”œβ”€β”€ MultiThreaderNode.tsx (320 lines) - Multi-thread control component
└── MultiThreaderNode.css (336 lines) - Multi-thread styling with indicators

Integration Points​

/components/WorkflowStudio/
β”œβ”€β”€ index.tsx - Added looper/multithreader to palette
└── WorkflowCanvas.tsx - Registered node types, added node components

πŸš€ Usage Examples​

Example 1: Batch Processing with Loop​

workflow:
- node: Looper
type: FOR_EACH
collection: orders
itemVariable: order
maxIterations: 1000
body:
- node: Process Order Task
input: "${order}"
- node: Send Confirmation
input: "${order.email}"

Example 2: Parallel API Calls​

workflow:
- node: Multi-Thread Fan-Out
type: FAN_OUT
maxThreads: 5
outputs:
- branch: Fetch User Data
- branch: Fetch Order History
- branch: Fetch Preferences
- node: Multi-Thread Fan-In
type: FAN_IN
joinStrategy: WAIT_ALL
- node: Aggregate Results

Example 3: Retry with Race Condition​

workflow:
- node: Multi-Thread Race
type: RACE
timeout: 5000
retryOnFailure: true
maxRetries: 3
threads:
- Try API Mirror 1
- Try API Mirror 2
- Try API Mirror 3

Example 4: Polling Loop​

workflow:
- node: Poll Until Ready
type: WHILE
condition: 'status !== "ready"'
maxIterations: 100
body:
- node: Check Status API
- node: Wait 5 Seconds

πŸ”§ Backend Integration​

Required Backend Methods​

Looper Execution​

// ValkyrWorkflowService.java
public Map<String, Object> executeLoop(
Task loopTask,
Map<String, Object> loopConfig,
Map<String, Object> state
) {
String loopType = (String) loopConfig.get("loopType");
switch (loopType) {
case "FOR":
return executeForLoop(loopTask, loopConfig, state);
case "WHILE":
return executeWhileLoop(loopTask, loopConfig, state);
case "FOR_EACH":
return executeForEachLoop(loopTask, loopConfig, state);
case "DO_WHILE":
return executeDoWhileLoop(loopTask, loopConfig, state);
}
}

Multi-Thread Execution​

// ValkyrWorkflowService.java
public Map<String, Object> executeMultiThread(
Task threadTask,
Map<String, Object> threadConfig,
Map<String, Object> state
) {
String mode = (String) threadConfig.get("mode");
Integer maxThreads = (Integer) threadConfig.getOrDefault("maxThreads", 10);

ExecutorService executor = Executors.newFixedThreadPool(maxThreads);

switch (mode) {
case "FAN_OUT":
return executeFanOut(executor, threadTask, threadConfig, state);
case "FAN_IN":
return executeFanIn(executor, threadTask, threadConfig, state);
case "RACE":
return executeRace(executor, threadTask, threadConfig, state);
case "ALL":
return executeAll(executor, threadTask, threadConfig, state);
}
}

🎯 Next Steps​

Phase 1: Basic Implementation (COMPLETE βœ…)​

  • Create LooperNode component
  • Create MultiThreaderNode component
  • Add to WorkflowStudio palette
  • Register with ReactFlow
  • Style with LCARS theme

Phase 2: Backend Integration (TODO)​

  • Implement executeLoop() in ValkyrWorkflowService
  • Implement executeMultiThread() in ValkyrWorkflowService
  • Add loop state tracking in WorkflowState
  • Add thread status tracking
  • Implement break/continue logic
  • Add safety limits (max iterations, timeouts)

Phase 3: Advanced Features (TODO)​

  • Nested loop support
  • Thread priority queuing
  • Dynamic thread pool sizing
  • Rate limiting for loops
  • Dead-lock detection
  • Performance metrics (avg iteration time, thread utilization)

Phase 4: UI Enhancements (TODO)​

  • Loop/thread configuration modals
  • Real-time status updates via WebSocket
  • Historical execution metrics
  • Error detail popover
  • Export loop/thread logs

πŸ› Known Issues​

  1. Linting Warnings: Missing braces after if statements (style warnings, non-breaking)
  2. Backend Not Implemented: Nodes are UI-only until backend methods added
  3. WebSocket Integration: Real-time status updates need WebSocket connection

πŸ“š Documentation​

See also:

  • WORKFLOW_STUDIO_ARCHITECTURE.md - Overall architecture
  • WORKFLOW_STUDIO_UX_OVERHAUL.md - UX implementation details
  • Backend: valkyrai/src/main/java/com/valkyrlabs/workflow/ValkyrWorkflowService.java

πŸŽ‰ Summary​

LOOPER and MULTI-THREADER modules are now available in the Workflow Studio palette!

These flow control nodes enable:

  • βœ… Complex iteration patterns (FOR, WHILE, FOR-EACH, DO-WHILE)
  • βœ… Parallel execution strategies (FAN-OUT, FAN-IN, RACE, ALL)
  • βœ… Thread pool management with safety limits
  • βœ… Real-time status visualization
  • βœ… Break/continue/timeout control

Ready to test: Drag from palette β†’ Drop on canvas β†’ Configure β†’ Connect! πŸš€

Backend integration required for execution - nodes currently provide UI framework and state tracking.