Skip to main content

Workflows & Execution

Comprehensive guide for debugging ValkyrAI workflows, ExecModules, and execution issues.

Common Workflow Execution Issues

Symptom: Workflow never completes (stuck in RUNNING state)

Cause 1: Async task not being awaited

// ❌ WRONG: CompletableFuture not awaited
CompletableFuture<?> future = workflowService.executeWorkflow(workflow);
// ... handler returns before future completes

// ✅ CORRECT: Await completion
CompletableFuture<?> future = workflowService.executeWorkflow(workflow);
return ResponseEntity.accepted()
.body(Map.of("workflowId", workflow.getId()));
// Client polls for status or uses WebSocket for updates

Cause 2: Blocking operation without timeout

// ❌ WRONG: No timeout, thread hangs indefinitely
externalService.blockingCall();

// ✅ CORRECT: Add timeout
try {
externalService.blockingCall(Duration.ofSeconds(30));
} catch (TimeoutException e) {
logger.error("External service timeout");
throw new WorkflowExecutionException("Timeout after 30s", e);
}

Cause 3: Thread pool exhaustion

// Check executor service stats
ThreadPoolExecutor executor = (ThreadPoolExecutor) workflowExecutor;
logger.info("Active threads: {}", executor.getActiveCount());
logger.info("Queue size: {}", executor.getQueue().size());

// If queue is full: increase pool size
@Bean
public Executor workflowExecutor() {
return new ThreadPoolExecutor(
10, // core threads (increased)
20, // max threads (increased)
60, // timeout
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // queue size
new ThreadPoolTaskExecutor.ThreadFactory("workflow-")
);
}

Debug Steps:

  1. Check workflow status: GET /v1/workflow/{id}
  2. Review task status: GET /v1/workflow/{id}/tasks
  3. Check logs for errors: grep "WORKFLOW\|ERROR" app.log
  4. Monitor thread pool: Check metrics dashboard

Symptom: Workflow fails with LazyInitializationException

Cause: Accessing lazy-loaded collections outside transactional boundary

Solution:

// ❌ WRONG: Lazy-loaded tasks accessed outside transaction
Workflow w = repo.findById(id).orElseThrow();
// Thread switches, session closes
CompletableFuture.supplyAsync(() -> {
w.getTasks().forEach(...); // LazyInitializationException!
return null;
});

// ✅ CORRECT: Eagerly load before async
@Transactional(readOnly = true)
public Workflow getWithTasks(UUID id) {
Workflow w = repo.findById(id).orElseThrow();
Hibernate.initialize(w.getTasks()); // Force eager load
return w;
}

// Or use @EntityGraph
@EntityGraph(attributePaths = {"tasks", "tasks.execModules"})
Optional<Workflow> findById(UUID id);

Symptom: Task state not persisting between steps

Cause: State not saved with @Transactional(REQUIRES_NEW)

Solution:

// ❌ WRONG: Relying on outer transaction
@Transactional
public CompletableFuture<Workflow> executeWorkflow(Workflow w) {
return CompletableFuture.supplyAsync(() -> {
Task task = performWork();
taskService.save(task); // May not commit!
return w;
});
}

// ✅ CORRECT: Save with new transaction
private void persistTask(Task task) {
taskService.saveWithNewTransaction(task);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void saveWithNewTransaction(Task task) {
taskRepo.save(task);
}

ExecModule Issues

Symptom: Module output doesn't appear in workflow state

Cause 1: Output mapping not configured

// ❌ WRONG: No output mapping
Task task = new Task();
task.setExecModule(emailModule);
// Output goes nowhere

// ✅ CORRECT: Configure output mapping
task.setOutputMapping(Map.of(
"messageId", "workflow.state.emailMessageId",
"status", "workflow.state.emailStatus"
));

Cause 2: Output map structure doesn't match mapping

// Module returns:
Map<String, Object> output = Map.of(
"status", "sent",
"messageId", "msg-123"
);

// But mapping expects:
"messageId", "workflow.state.emailMessageId" // ✅ matches output key

// Common mistake:
"email_id", "workflow.state.messageId" // ❌ wrong key name in output

Debug:

// Log module output
@Override
public Map<String, Object> execute(...) {
Map<String, Object> result = doWork();
logger.info("Module output: {}", result); // Inspect structure
return result;
}

Symptom: ExecModule throws exception and stops workflow

Cause: Unhandled exception in module

Solution:

// ❌ WRONG: Exception bubbles and halts workflow
@Override
public Map<String, Object> execute(...) {
String url = (String) input.get("url");
HttpResponse response = http.get(url); // May throw!
return Map.of("status", "ok", "data", response.body());
}

// ✅ CORRECT: Catch and return error map
@Override
public Map<String, Object> execute(...) {
try {
String url = (String) input.get("url");
HttpResponse response = http.get(url);
return Map.of("status", "success", "data", response.body());
} catch (HttpClientException e) {
logger.error("HTTP request failed", e);
return Map.of(
"status", "failed",
"error", e.getMessage(),
"code", e.getStatusCode()
);
} catch (Exception e) {
logger.error("Unexpected error", e);
return Map.of(
"status", "error",
"error", e.getMessage()
);
}
}

Key Pattern: Always return a Map<String, Object>, never throw from execute().


Symptom: Input mapping doesn't resolve values

Cause 1: Input path doesn't exist in workflow state

// Workflow state:
{
"user": {
"id": "123",
"email": "john@example.com"
}
}

// Input mapping:
"email", "workflow.state.user.email" // ✅ Valid path

// Common mistake:
"email", "workflow.state.profile.email" // ❌ "profile" doesn't exist

Cause 2: Array indexing syntax wrong

// State with array:
{
"items": [
{ "id": "1", "name": "Widget" },
{ "id": "2", "name": "Gadget" }
]
}

// ✅ Correct syntax:
"firstItemId", "workflow.state.items[0].id"

// ❌ Wrong syntax:
"firstItemId", "workflow.state.items.0.id" // Not valid

Debug Input Resolution:

// Log resolved input before module execution
Map<String, Object> input = resolveInputMapping(task);
logger.info("Resolved input: {}", input); // See what module actually receives

Scheduled Workflow Issues

Symptom: Scheduled workflow doesn't execute at expected time

Cause 1: Cron expression syntax error

// ❌ WRONG cron expressions:
"*/30 * * * * ?" // Space count wrong (has 6 fields, needs 6)
"0 0 0 * * * *" // Too many fields (7 instead of 6)
"0 30 2" // Too few fields (3 instead of 6)

// ✅ CORRECT cron format (seconds minutes hours day month dayOfWeek):
"0 */30 * * * ?" // Every 30 minutes
"0 0 9 * * MON-FRI" // 9 AM weekdays
"0 0 0 1 * ?" // Daily at midnight

Validate Cron:

import org.springframework.scheduling.support.CronSequenceGenerator;

String cron = "0 */30 * * * ?";
try {
new CronSequenceGenerator(cron);
logger.info("Valid cron expression");
} catch (IllegalArgumentException e) {
logger.error("Invalid cron: {}", e.getMessage());
}

Cause 2: Timezone mismatch

// ❌ WRONG: Assuming server timezone
workflow.setSchedule("0 9 * * * ?"); // Assumes server is EST, but it's UTC!

// ✅ CORRECT: Use timezone-aware configuration
workflow.setSchedule("0 9 * * * ?");
workflow.setTimezone("America/New_York");

Symptom: Scheduled workflow runs multiple times

Cause: Multiple instances or no distributed lock

Solution:

// Ensure only one instance processes
@Scheduled(cron = "0 */30 * * * ?")
@DistributedLock(lockName = "workflow-scheduler")
public void triggerScheduledWorkflows() {
// Only one instance executes this at a time
workflowService.executeScheduledWorkflows();
}

Data Flow & State Issues

Symptom: Parallel tasks interfere with each other

Cause: Shared mutable state in parallel execution

Solution:

# ❌ WRONG: Tasks share state
tasks:
- id: task1
module: transformation
parallel: true
parallelGroup: A
- id: task2
module: transformation
parallel: true
parallelGroup: A
# Both tasks read/write same state field!

# ✅ CORRECT: Each task has isolated output
tasks:
- id: task1
module: transformation
outputMapping:
result: "workflow.state.result1" # Isolated!
parallel: true
parallelGroup: A
- id: task2
module: transformation
outputMapping:
result: "workflow.state.result2" # Isolated!
parallel: true
parallelGroup: A

Symptom: Conditional branching always takes same path

Cause: Condition evaluation wrong

Solution:

// ❌ WRONG: String comparison
if (status == "active") { } // Always false (object comparison)

// ✅ CORRECT: Proper comparison
if ("active".equals(status)) { }
if (status != null && status.equalsIgnoreCase("active")) { }

// In YAML:
condition: "workflow.state.status == 'active'" // Quotes matter!

Debug Condition:

// Log condition evaluation
boolean result = evaluateCondition(task.getCondition(), workflowState);
logger.info("Condition '{}' evaluated to: {}",
task.getCondition(), result);

Performance & Resource Issues

Symptom: Workflow executes slowly

Cause 1: Synchronous external calls

// ❌ SLOW: Blocking HTTP calls
for (String id : ids) {
User user = userService.getUser(id); // Blocks!
process(user);
}

// ✅ FAST: Batch or parallel
List<User> users = userService.getUsersBatch(ids); // Single call

Cause 2: N+1 query problem

// ❌ WRONG: N+1 queries
List<Workflow> workflows = workflowRepo.findAll();
for (Workflow w : workflows) {
List<Task> tasks = taskRepo.findByWorkflow(w.getId()); // Query per workflow!
}

// ✅ CORRECT: Join or EntityGraph
@EntityGraph(attributePaths = "tasks")
List<Workflow> findAll();

Symptom: Out of memory during workflow execution

Cause: Accumulating large objects in state

Solution:

// ❌ WRONG: State accumulates all items
workflowState.put("allItems", millionItemsList);

// ✅ CORRECT: Process in batches, update incrementally
List<Item> batch = loadBatch(offset, batchSize);
processBatch(batch);
// Only keep current batch in state
workflowState.put("currentBatch", batch);
workflowState.put("processedCount", offset + batch.size());

SecurityContext & Auth Issues

Symptom: Workflow loses authentication in async thread

Cause: SecurityContext not propagated to async thread

Solution:

// ❌ WRONG: No auth context in async
CompletableFuture.supplyAsync(() -> {
return workflowService.execute(workflow); // SecurityContext is null!
});

// ✅ CORRECT: Propagate auth
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
CompletableFuture.supplyAsync(() -> {
SecurityContext ctx = SecurityContextHolder.createEmptyContext();
ctx.setAuthentication(auth);
SecurityContextHolder.setContext(ctx);
try {
return workflowService.execute(workflow);
} finally {
SecurityContextHolder.clearContext();
}
});

Debugging Best Practices

Enable Detailed Logging

# application.properties
logging.level.com.valkyrlabs.workflow=DEBUG
logging.level.com.valkyrlabs.workflow.modules=TRACE
logging.pattern.console=%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n

Use Workflow Status Endpoint

# Check workflow status
curl http://localhost:8080/v1/workflow/wf-123 \
-H "Authorization: Bearer $TOKEN"

# Response includes:
{
"id": "wf-123",
"status": "RUNNING",
"progress": 45,
"currentTask": "task-2",
"executionLogs": [...]
}

Monitor Task Execution

# Get task details and execution trace
curl http://localhost:8080/v1/workflow/wf-123/tasks/task-2 \
-H "Authorization: Bearer $TOKEN"

# Response includes:
{
"id": "task-2",
"status": "COMPLETED",
"startedAt": "2025-11-13T14:00:00Z",
"completedAt": "2025-11-13T14:00:05Z",
"input": {...},
"output": {...},
"error": null
}

Check WebSocket Status Updates

// Frontend: Monitor real-time updates
const ws = new WebSocket("wss://localhost:8080/v1/workflow/subscribe/wf-123");

ws.onmessage = (event) => {
const update = JSON.parse(event.data);
console.log("Workflow progress:", update.progress);
console.log("Current task:", update.currentTask);
};

Quick Troubleshooting Checklist

  • Check workflow status is not FAILED
  • Verify all task inputs resolve (no undefined values)
  • Confirm output mappings match module output keys
  • Check module logs for exceptions
  • Verify scheduled cron expression syntax
  • Monitor thread pool and queue sizes
  • Check SecurityContext propagation in async tasks
  • Validate all external service endpoints are reachable
  • Review input/output examples in module documentation
  • Test module in isolation before adding to workflow

PRODUCTION DEBUGGING READY