Skip to main content

ValkyrAI Workflow Engine v2.0 — Implementation Plan

Status: Implementation In Progress Version: v2.0 (Kill n8n Edition) Date: 2025-10-23 Owner: Platform Architecture Team

"All I am surrounded by is fear… and dead queues." — ValkyrAI to n8n 😈


Executive Summary

This document outlines the complete implementation strategy for upgrading ValkyrAI's workflow engine from the current v1.x execution model to the v2.0 ThorAPI-native, horizontally scalable, transactionally safe workflow + agent orchestration system specified in the PRD.

Core Improvements

  1. WorkflowExecution Model — Separate execution instances from workflow definitions with full state tracking
  2. Run Model — Granular task attempt tracking with idempotency, leasing, and DLQ support
  3. Enhanced Scheduling — Quartz-based CRON + event triggers + webhook endpoints
  4. Runner Pool — Isolated step executors with resource limits, circuit breakers, and retry policies
  5. Observability — OpenTelemetry tracing, metrics, DLQ tooling, and replay capabilities
  6. UX Studio — React Flow visual designer with real-time execution monitoring

Current State Analysis

✅ What We Have (v1.x)

From the codebase analysis:

  • Core Models: Workflow, Task, ExecModule, WorkflowState
  • Execution Service: ValkyrWorkflowService with async execution via CompletableFuture
  • Module System: 20+ ExecModules (Email, REST, Stripe, Files, etc.) extending VModule
  • Scheduling: Quartz integration with loadWorkflowSchedules() and WorkflowJob
  • Event System: @EventListener for workflow triggers with registerEventTrigger()
  • Security: RBAC + ACL on all workflow operations
  • WebSocket: Real-time monitoring at /v1/vaiworkflow/subscribe/{workflowId}
  • Metrics: WorkflowMetricsAspect with comprehensive event emission
  • Frontend: React components in web/typescript/valkyr_labs_com/src/components/WorkflowStudio/

🔧 What Needs Enhancement (v2.0)

  1. Execution Tracking — No separate WorkflowExecution model (runs mixed with definitions)
  2. Run/Attempt Tracking — No granular Run model for task attempts
  3. Idempotency — Basic support but not systematically enforced with content hashing
  4. Leasing/Recovery — No distributed lease mechanism for crash recovery
  5. DLQ — Error states tracked but no formal DLQ + replay tooling
  6. Runner Isolation — Modules run in-process; no containerized sandbox
  7. Studio UX — Basic ModuleChainViewer; needs React Flow upgrade
  8. Observability — Good metrics/events; needs OpenTelemetry + flame charts

Phase 1: Data Model Extensions (ThorAPI)

1.1 WorkflowExecution Schema

Add to api.hbs.yaml:

WorkflowExecution:
type: object
description: An execution instance of a Workflow with state tracking
properties:
id:
type: string
format: uuid
workflowId:
type: string
format: uuid
description: Reference to the Workflow definition
version:
type: integer
description: Workflow version number at execution time
state:
type: string
enum: [PENDING, RUNNING, PAUSED, SUCCESS, FAILED, CANCELLED, TIMEOUT]
description: Current execution state
startedAt:
type: string
format: date-time
finishedAt:
type: string
format: date-time
parentExecutionId:
type: string
format: uuid
description: For nested/sub-workflows
triggerType:
type: string
enum: [MANUAL, CRON, EVENT, WEBHOOK, QUEUE]
triggerPayload:
type: string
description: JSON payload that initiated execution
contextData:
type: string
description: Execution context (merged state from WorkflowState)
errorMessage:
type: string
errorStackTrace:
type: string
metricsSnapshot:
type: string
description: JSON snapshot of execution metrics (duration, costs, etc.)

1.2 Run Schema Enhancement

Current State: Basic Run model exists with executionId, taskId, attempt, state enum.

Enhancements needed:

Run:
type: object
description: A single attempt to execute a Task within a WorkflowExecution
properties:
id:
type: string
format: uuid
executionId:
type: string
format: uuid
taskId:
type: string
format: uuid
execModuleId:
type: string
format: uuid
description: Specific module within task
attempt:
type: integer
description: Retry attempt number (1-based)
state:
type: string
enum: [PENDING, LEASED, RUNNING, SUCCESS, FAILED, DLQ, CANCELLED]
leaseUntil:
type: string
format: date-time
description: Lease expiration for at-least-once delivery
leasedBy:
type: string
description: Runner/pod ID that owns the lease
runnerId:
type: string
description: Identifier of the runner pod
idempotencyKey:
type: string
description: Content hash of inputs + config for exact-once semantics
inputsHash:
type: string
description: SHA-256 of input map
configHash:
type: string
description: SHA-256 of module config
startedAt:
type: string
format: date-time
finishedAt:
type: string
format: date-time
heartbeatAt:
type: string
format: date-time
outputs:
type: string
description: JSON result map from module execution
error:
type: string
errorType:
type: string
enum: [TRANSIENT, PERMANENT, TIMEOUT, CIRCUIT_OPEN]
retryAfter:
type: string
format: date-time
durationMs:
type: integer
costTokens:
type: number
format: double
description: For LLM/API cost tracking

1.3 DLQ Schema

DeadLetterQueue:
type: object
description: Failed runs quarantined for manual intervention
properties:
id:
type: string
format: uuid
runId:
type: string
format: uuid
executionId:
type: string
format: uuid
taskId:
type: string
format: uuid
failureReason:
type: string
failureType:
type: string
enum: [MAX_RETRIES, PERMANENT_ERROR, CIRCUIT_BREAKER, MANUAL]
originalInputs:
type: string
description: JSON snapshot of inputs for replay
originalConfig:
type: string
description: JSON snapshot of module config
quarantinedAt:
type: string
format: date-time
reviewedBy:
type: string
format: uuid
description: Principal who reviewed
resolution:
type: string
enum: [PENDING, REQUEUED, FIXED, DISCARDED]
resolvedAt:
type: string
format: date-time
notes:
type: string

1.4 Circuit Breaker State

CircuitBreakerState:
type: object
description: Circuit breaker state for external dependencies
properties:
id:
type: string
format: uuid
target:
type: string
description: Module class + target endpoint identifier
state:
type: string
enum: [CLOSED, OPEN, HALF_OPEN]
failureCount:
type: integer
successCount:
type: integer
lastFailureAt:
type: string
format: date-time
openedAt:
type: string
format: date-time
nextRetryAt:
type: string
format: date-time
windowStartAt:
type: string
format: date-time

Phase 2: API Endpoints (ThorAPI)

Add to paths: section in api.hbs.yaml:

# WorkflowExecution endpoints
/WorkflowExecution:
get:
tags: [WorkflowExecution]
summary: List workflow executions with filters
operationId: listWorkflowExecutions
parameters:
- name: workflowId
in: query
schema:
type: string
format: uuid
- name: state
in: query
schema:
type: string
- name: limit
in: query
schema:
type: integer
default: 50
responses:
"200":
description: List of executions
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/WorkflowExecution"

/WorkflowExecution/{id}:
get:
tags: [WorkflowExecution]
summary: Get execution details
operationId: getWorkflowExecution
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Execution details
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowExecution"

/WorkflowExecution/{id}/cancel:
post:
tags: [WorkflowExecution]
summary: Cancel a running execution
operationId: cancelWorkflowExecution
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Cancellation initiated

/WorkflowExecution/{id}/pause:
post:
tags: [WorkflowExecution]
summary: Pause execution
operationId: pauseWorkflowExecution
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Execution paused

/WorkflowExecution/{id}/resume:
post:
tags: [WorkflowExecution]
summary: Resume paused execution
operationId: resumeWorkflowExecution
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Execution resumed

# Run endpoints
/Run:
get:
tags: [Run]
summary: List runs with filters
operationId: listRuns
parameters:
- name: executionId
in: query
schema:
type: string
format: uuid
- name: taskId
in: query
schema:
type: string
format: uuid
- name: state
in: query
schema:
type: string
responses:
"200":
description: List of runs
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/Run"

/Run/{id}/heartbeat:
post:
tags: [Run]
summary: Update heartbeat timestamp (runner keepalive)
operationId: updateRunHeartbeat
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Heartbeat recorded

# DLQ endpoints
/DeadLetterQueue:
get:
tags: [DeadLetterQueue]
summary: List quarantined runs
operationId: listDeadLetterQueue
responses:
"200":
description: List of DLQ entries
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/DeadLetterQueue"

/DeadLetterQueue/{id}/requeue:
post:
tags: [DeadLetterQueue]
summary: Requeue a DLQ entry for retry
operationId: requeueDeadLetterEntry
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
requestBody:
description: Optional input overrides for replay
content:
application/json:
schema:
type: object
additionalProperties: true
responses:
"200":
description: Entry requeued

/DeadLetterQueue/{id}/discard:
post:
tags: [DeadLetterQueue]
summary: Discard a DLQ entry permanently
operationId: discardDeadLetterEntry
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Entry discarded

Phase 3: Service Layer Implementation

3.1 WorkflowExecutionService

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/service/WorkflowExecutionService.java

@Service
@Slf4j
public class WorkflowExecutionService {

@Autowired
private WorkflowExecutionRepository executionRepo;

@Autowired
private RunRepository runRepo;

@Autowired
private WorkflowService workflowService;

@Autowired
private TaskService taskService;

@Autowired
private ValkyrWorkflowService workflowEngineService;

/**
* Create a new execution instance from a workflow
*/
@Transactional
public WorkflowExecution createExecution(Workflow workflow, String triggerType, String triggerPayload) {
WorkflowExecution execution = new WorkflowExecution();
execution.setWorkflowId(workflow.getId());
execution.setVersion(workflow.getVersion());
execution.setState(WorkflowExecution.StateEnum.PENDING);
execution.setTriggerType(triggerType);
execution.setTriggerPayload(triggerPayload);
execution.setStartedAt(OffsetDateTime.now());

// Initialize contextData from workflow state
Map<String, Object> context = new HashMap<>();
for (WorkflowState state : workflow.getWorkflowState()) {
context.put(state.getName(), state.getStateValue());
}
execution.setContextData(toJson(context));

return executionRepo.save(execution);
}

/**
* Execute workflow and create execution instance
*/
public CompletableFuture<WorkflowExecution> executeWorkflow(Workflow workflow, String triggerType, Map<String, Object> payload) {
WorkflowExecution execution = createExecution(workflow, triggerType, toJson(payload));

return CompletableFuture.supplyAsync(() -> {
try {
execution.setState(WorkflowExecution.StateEnum.RUNNING);
persistExecution(execution);

// Delegate to existing engine with execution tracking
Map<String, Object> result = workflowEngineService.executeWorkflowSync(workflow, payload);

execution.setState(WorkflowExecution.StateEnum.SUCCESS);
execution.setFinishedAt(OffsetDateTime.now());
execution.setContextData(toJson(result));

} catch (Exception e) {
log.error("Execution {} failed", execution.getId(), e);
execution.setState(WorkflowExecution.StateEnum.FAILED);
execution.setFinishedAt(OffsetDateTime.now());
execution.setErrorMessage(e.getMessage());
execution.setErrorStackTrace(getStackTrace(e));
}

persistExecution(execution);
return execution;
});
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
void persistExecution(WorkflowExecution execution) {
executionRepo.save(execution);
}
}

3.2 RunService (Lease + Idempotency)

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/service/RunService.java

@Service
@Slf4j
public class RunService {

@Autowired
private RunRepository runRepo;

@Autowired
private DeadLetterQueueRepository dlqRepo;

@Value("${valkyrai.runner.lease.duration:PT2M}")
private Duration leaseDuration;

@Value("${valkyrai.runner.max.attempts:3}")
private int maxAttempts;

/**
* Create run for task execution with idempotency key
*/
@Transactional
public Run createRun(UUID executionId, Task task, ExecModule module, Map<String, Object> inputs) {
// Compute idempotency key
String inputsHash = computeHash(inputs);
String configHash = computeHash(module.getModuleData());
String idempotencyKey = inputsHash + ":" + configHash;

// Check for existing run with same idempotency key
Optional<Run> existing = runRepo.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent() && existing.get().getState() == Run.StateEnum.SUCCESS) {
log.info("Idempotent run found: {}", existing.get().getId());
return existing.get();
}

Run run = new Run();
run.setExecutionId(executionId);
run.setTaskId(task.getId());
run.setExecModuleId(module.getId());
run.setAttempt(1);
run.setState(Run.StateEnum.PENDING);
run.setIdempotencyKey(idempotencyKey);
run.setInputsHash(inputsHash);
run.setConfigHash(configHash);

return runRepo.save(run);
}

/**
* Acquire lease for run execution
*/
@Transactional
public Optional<Run> acquireLease(UUID runId, String runnerId) {
Optional<Run> runOpt = runRepo.findById(runId);
if (runOpt.isEmpty()) {
return Optional.empty();
}

Run run = runOpt.get();
OffsetDateTime now = OffsetDateTime.now();

// Check if already leased and not expired
if (run.getLeaseUntil() != null && run.getLeaseUntil().isAfter(now)) {
log.debug("Run {} already leased until {}", runId, run.getLeaseUntil());
return Optional.empty();
}

// Acquire lease
run.setState(Run.StateEnum.LEASED);
run.setLeasedBy(runnerId);
run.setRunnerId(runnerId);
run.setLeaseUntil(now.plus(leaseDuration));

return Optional.of(runRepo.save(run));
}

/**
* Update heartbeat (runner keepalive)
*/
@Transactional
public void updateHeartbeat(UUID runId) {
runRepo.findById(runId).ifPresent(run -> {
run.setHeartbeatAt(OffsetDateTime.now());
// Extend lease
run.setLeaseUntil(OffsetDateTime.now().plus(leaseDuration));
runRepo.save(run);
});
}

/**
* Mark run as success
*/
@Transactional
public void markSuccess(UUID runId, Map<String, Object> outputs, long durationMs) {
runRepo.findById(runId).ifPresent(run -> {
run.setState(Run.StateEnum.SUCCESS);
run.setFinishedAt(OffsetDateTime.now());
run.setOutputs(toJson(outputs));
run.setDurationMs((int) durationMs);
runRepo.save(run);
});
}

/**
* Mark run as failed and handle retry or DLQ
*/
@Transactional
public void markFailed(UUID runId, String error, String errorType, boolean canRetry) {
runRepo.findById(runId).ifPresent(run -> {
run.setError(error);
run.setErrorType(errorType);
run.setFinishedAt(OffsetDateTime.now());

if (canRetry && run.getAttempt() < maxAttempts) {
// Requeue for retry
run.setState(Run.StateEnum.PENDING);
run.setAttempt(run.getAttempt() + 1);
run.setRetryAfter(OffsetDateTime.now().plus(computeBackoff(run.getAttempt())));
run.setLeaseUntil(null);
run.setLeasedBy(null);
} else {
// Move to DLQ
run.setState(Run.StateEnum.DLQ);
moveToDLQ(run, "MAX_RETRIES");
}

runRepo.save(run);
});
}

/**
* Move failed run to DLQ
*/
private void moveToDLQ(Run run, String reason) {
DeadLetterQueue dlq = new DeadLetterQueue();
dlq.setRunId(run.getId());
dlq.setExecutionId(run.getExecutionId());
dlq.setTaskId(run.getTaskId());
dlq.setFailureReason(run.getError());
dlq.setFailureType(reason);
dlq.setOriginalInputs(run.getOutputs()); // Store for replay
dlq.setQuarantinedAt(OffsetDateTime.now());
dlq.setResolution(DeadLetterQueue.ResolutionEnum.PENDING);

dlqRepo.save(dlq);
}

private Duration computeBackoff(int attempt) {
// Exponential backoff with jitter
long baseMs = 1000L * (long) Math.pow(2, attempt - 1);
long jitter = ThreadLocalRandom.current().nextLong(0, 500);
return Duration.ofMillis(baseMs + jitter);
}

private String computeHash(Object data) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
String json = toJson(data);
byte[] hash = digest.digest(json.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}

3.3 DLQService

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/service/DLQService.java

@Service
@Slf4j
public class DLQService {

@Autowired
private DeadLetterQueueRepository dlqRepo;

@Autowired
private RunService runService;

@Autowired
private WorkflowExecutionService executionService;

/**
* List all DLQ entries with filters
*/
public List<DeadLetterQueue> listDLQ(DeadLetterQueue.ResolutionEnum resolution) {
if (resolution != null) {
return dlqRepo.findByResolution(resolution);
}
return dlqRepo.findAll();
}

/**
* Requeue DLQ entry for retry with optional input overrides
*/
@Transactional
public void requeue(UUID dlqId, Map<String, Object> inputOverrides) {
DeadLetterQueue dlq = dlqRepo.findById(dlqId)
.orElseThrow(() -> new IllegalArgumentException("DLQ entry not found"));

// Parse original inputs
Map<String, Object> inputs = fromJson(dlq.getOriginalInputs());
if (inputOverrides != null) {
inputs.putAll(inputOverrides);
}

// Create new run
Run run = runService.createRun(
dlq.getExecutionId(),
taskService.findById(dlq.getTaskId()).orElseThrow(),
execModuleService.findById(/* ... */),
inputs
);

// Mark DLQ as resolved
dlq.setResolution(DeadLetterQueue.ResolutionEnum.REQUEUED);
dlq.setResolvedAt(OffsetDateTime.now());
dlqRepo.save(dlq);

log.info("DLQ entry {} requeued as run {}", dlqId, run.getId());
}

/**
* Discard DLQ entry
*/
@Transactional
public void discard(UUID dlqId) {
DeadLetterQueue dlq = dlqRepo.findById(dlqId)
.orElseThrow(() -> new IllegalArgumentException("DLQ entry not found"));

dlq.setResolution(DeadLetterQueue.ResolutionEnum.DISCARDED);
dlq.setResolvedAt(OffsetDateTime.now());
dlqRepo.save(dlq);
}
}

Phase 4: Runner Pool Implementation

4.1 Runner Service

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/runner/RunnerService.java

@Service
@Slf4j
public class RunnerService {

@Value("${valkyrai.runner.id:#{T(java.util.UUID).randomUUID().toString()}}")
private String runnerId;

@Value("${valkyrai.runner.concurrency:10}")
private int concurrency;

@Autowired
private RunService runService;

@Autowired
private TaskService taskService;

@Autowired
private ApplicationContext applicationContext;

private final ExecutorService executor = Executors.newFixedThreadPool(10);

@Scheduled(fixedDelayString = "${valkyrai.runner.poll.interval:PT5S}")
public void pollForWork() {
// Find pending runs
List<Run> pending = runService.findPendingRuns(concurrency);

for (Run run : pending) {
// Try to acquire lease
Optional<Run> leased = runService.acquireLease(run.getId(), runnerId);
if (leased.isEmpty()) {
continue;
}

// Submit to executor
executor.submit(() -> executeRun(leased.get()));
}
}

private void executeRun(Run run) {
long startTime = System.currentTimeMillis();

try {
// Update state
run.setState(Run.StateEnum.RUNNING);
run.setStartedAt(OffsetDateTime.now());
runService.save(run);

// Start heartbeat thread
ScheduledExecutorService heartbeat = Executors.newSingleThreadScheduledExecutor();
heartbeat.scheduleAtFixedRate(
() -> runService.updateHeartbeat(run.getId()),
0, 2, TimeUnit.SECONDS
);

try {
// Load task and module
Task task = taskService.findById(run.getTaskId()).orElseThrow();
ExecModule module = task.getModules().stream()
.filter(m -> m.getId().equals(run.getExecModuleId()))
.findFirst()
.orElseThrow();

// Load VModule implementation
VModule vmodule = applicationContext.getBean(module.getClassName(), VModule.class);

// Parse inputs
Map<String, Object> inputs = fromJson(run.getOutputs());

// Execute module
Map<String, Object> outputs = vmodule.execute(null, task, module, inputs);

// Mark success
long duration = System.currentTimeMillis() - startTime;
runService.markSuccess(run.getId(), outputs, duration);

log.info("Run {} completed successfully in {}ms", run.getId(), duration);

} finally {
heartbeat.shutdown();
}

} catch (Exception e) {
log.error("Run {} failed", run.getId(), e);

// Determine error type
String errorType = determineErrorType(e);
boolean canRetry = isRetryable(errorType);

runService.markFailed(run.getId(), e.getMessage(), errorType, canRetry);
}
}

private String determineErrorType(Exception e) {
if (e instanceof TimeoutException) {
return "TIMEOUT";
} else if (e instanceof CircuitBreakerOpenException) {
return "CIRCUIT_OPEN";
} else if (e instanceof IOException || e instanceof HttpClientErrorException) {
return "TRANSIENT";
} else {
return "PERMANENT";
}
}

private boolean isRetryable(String errorType) {
return "TRANSIENT".equals(errorType) || "TIMEOUT".equals(errorType);
}
}

Phase 5: Controller Endpoints

5.1 WorkflowExecutionController

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/controller/WorkflowExecutionController.java

@RestController
@RequestMapping("/v1/WorkflowExecution")
@Slf4j
public class WorkflowExecutionController {

@Autowired
private WorkflowExecutionService executionService;

@GetMapping
@PreAuthorize("hasRole('ADMIN') or hasRole('WORKFLOW')")
public ResponseEntity<List<WorkflowExecution>> listExecutions(
@RequestParam(required = false) UUID workflowId,
@RequestParam(required = false) String state,
@RequestParam(defaultValue = "50") int limit) {

List<WorkflowExecution> executions = executionService.findExecutions(workflowId, state, limit);
return ResponseEntity.ok(executions);
}

@GetMapping("/{id}")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'READ')")
public ResponseEntity<WorkflowExecution> getExecution(@PathVariable UUID id) {
return executionService.findById(id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}

@PostMapping("/{id}/cancel")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'WRITE')")
public ResponseEntity<Void> cancelExecution(@PathVariable UUID id) {
executionService.cancelExecution(id);
return ResponseEntity.ok().build();
}

@PostMapping("/{id}/pause")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'WRITE')")
public ResponseEntity<Void> pauseExecution(@PathVariable UUID id) {
executionService.pauseExecution(id);
return ResponseEntity.ok().build();
}

@PostMapping("/{id}/resume")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'WRITE')")
public ResponseEntity<Void> resumeExecution(@PathVariable UUID id) {
executionService.resumeExecution(id);
return ResponseEntity.ok().build();
}
}

5.2 DLQController

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/controller/DLQController.java

@RestController
@RequestMapping("/v1/DeadLetterQueue")
@Slf4j
public class DLQController {

@Autowired
private DLQService dlqService;

@GetMapping
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<List<DeadLetterQueue>> listDLQ(
@RequestParam(required = false) String resolution) {

DeadLetterQueue.ResolutionEnum resEnum = resolution != null
? DeadLetterQueue.ResolutionEnum.valueOf(resolution)
: null;

return ResponseEntity.ok(dlqService.listDLQ(resEnum));
}

@PostMapping("/{id}/requeue")
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<Void> requeue(
@PathVariable UUID id,
@RequestBody(required = false) Map<String, Object> inputOverrides) {

dlqService.requeue(id, inputOverrides);
return ResponseEntity.ok().build();
}

@PostMapping("/{id}/discard")
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<Void> discard(@PathVariable UUID id) {
dlqService.discard(id);
return ResponseEntity.ok().build();
}
}

Phase 6: Observability (OpenTelemetry)

6.1 Tracing Configuration

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/config/ObservabilityConfig.java

@Configuration
public class ObservabilityConfig {

@Bean
public OpenTelemetry openTelemetry() {
return AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
}

@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("valkyrai-workflow", "2.0");
}
}

6.2 Tracing Aspect

Location: valkyrai/src/main/java/com/valkyrlabs/workflow/observability/WorkflowTracingAspect.java

@Aspect
@Component
@Slf4j
public class WorkflowTracingAspect {

@Autowired
private Tracer tracer;

@Around("execution(* com.valkyrlabs.workflow.runner.RunnerService.executeRun(..))")
public Object traceRunExecution(ProceedingJoinPoint pjp) throws Throwable {
Run run = (Run) pjp.getArgs()[0];

Span span = tracer.spanBuilder("run.execute")
.setAttribute("run.id", run.getId().toString())
.setAttribute("execution.id", run.getExecutionId().toString())
.setAttribute("task.id", run.getTaskId().toString())
.setAttribute("attempt", run.getAttempt())
.startSpan();

try (Scope scope = span.makeCurrent()) {
Object result = pjp.proceed();
span.setStatus(StatusCode.OK);
return result;
} catch (Exception e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR, e.getMessage());
throw e;
} finally {
span.end();
}
}
}

Phase 7: Frontend Studio (React Flow)

7.1 WorkflowExecutionMonitor Component

Location: web/typescript/valkyr_labs_com/src/components/WorkflowStudio/WorkflowExecutionMonitor.tsx

import React, { useEffect, useState } from "react";
import {
useGetWorkflowExecutionQuery,
useListRunsQuery,
} from "../../backend/api";
import { WorkflowExecution, Run } from "../../backend/model";

interface WorkflowExecutionMonitorProps {
executionId: string;
}

export const WorkflowExecutionMonitor: React.FC<
WorkflowExecutionMonitorProps
> = ({ executionId }) => {
const { data: execution, isLoading } = useGetWorkflowExecutionQuery({
id: executionId,
});
const { data: runs } = useListRunsQuery({ executionId });

const [flamechart, setFlamechart] = useState<any[]>([]);

useEffect(() => {
if (runs) {
// Build flamechart data
const chart = runs.map((run) => ({
id: run.id,
taskId: run.taskId,
startTime: new Date(run.startedAt).getTime(),
duration: run.durationMs,
state: run.state,
}));
setFlamechart(chart);
}
}, [runs]);

if (isLoading) return <div>Loading...</div>;

return (
<div className="execution-monitor">
<div className="execution-header">
<h2>Execution: {execution?.id}</h2>
<span className={`state-badge ${execution?.state}`}>
{execution?.state}
</span>
</div>

<div className="execution-timeline">
<h3>Task Timeline</h3>
<div className="flamechart">
{flamechart.map((item) => (
<div
key={item.id}
className={`flame-bar ${item.state}`}
style={{
left: `${(item.startTime - flamechart[0]?.startTime) / 100}px`,
width: `${item.duration / 10}px`,
}}
title={`Task: ${item.taskId}, Duration: ${item.duration}ms`}
/>
))}
</div>
</div>

<div className="runs-table">
<h3>Run History</h3>
<table>
<thead>
<tr>
<th>Run ID</th>
<th>Task</th>
<th>Attempt</th>
<th>State</th>
<th>Duration</th>
<th>Started</th>
</tr>
</thead>
<tbody>
{runs?.map((run) => (
<tr key={run.id}>
<td>{run.id}</td>
<td>{run.taskId}</td>
<td>{run.attempt}</td>
<td className={`state ${run.state}`}>{run.state}</td>
<td>{run.durationMs}ms</td>
<td>{new Date(run.startedAt).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
);
};

7.2 DLQ Browser Component

Location: web/typescript/valkyr_labs_com/src/components/WorkflowStudio/DLQBrowser.tsx

import React from "react";
import {
useListDeadLetterQueueQuery,
useRequeueDeadLetterEntryMutation,
useDiscardDeadLetterEntryMutation,
} from "../../backend/api";

export const DLQBrowser: React.FC = () => {
const { data: dlqEntries, refetch } = useListDeadLetterQueueQuery({
resolution: "PENDING",
});
const [requeue] = useRequeueDeadLetterEntryMutation();
const [discard] = useDiscardDeadLetterEntryMutation();

const handleRequeue = async (id: string) => {
await requeue({ id });
refetch();
};

const handleDiscard = async (id: string) => {
if (confirm("Are you sure you want to discard this entry?")) {
await discard({ id });
refetch();
}
};

return (
<div className="dlq-browser">
<h2>Dead Letter Queue</h2>

<table>
<thead>
<tr>
<th>Quarantined</th>
<th>Task</th>
<th>Failure Reason</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{dlqEntries?.map((entry) => (
<tr key={entry.id}>
<td>{new Date(entry.quarantinedAt).toLocaleString()}</td>
<td>{entry.taskId}</td>
<td title={entry.failureReason} className="truncate">
{entry.failureReason}
</td>
<td>
<button onClick={() => handleRequeue(entry.id)}>Requeue</button>
<button
onClick={() => handleDiscard(entry.id)}
className="danger"
>
Discard
</button>
</td>
</tr>
))}
</tbody>
</table>
</div>
);
};

Phase 8: Integration with Existing System

8.1 Migration Strategy

Backward Compatibility:

  1. Keep existing ValkyrWorkflowService.executeWorkflow() as-is
  2. Wrap it with WorkflowExecutionService.executeWorkflow() to create execution records
  3. Gradually migrate modules to use RunService for idempotency
  4. Add @Deprecated to old direct execution methods

Database Migration:

-- Add WorkflowExecution table
CREATE TABLE workflow_execution (
id UUID PRIMARY KEY,
workflow_id UUID NOT NULL REFERENCES workflow(id),
version INTEGER,
state VARCHAR(20),
started_at TIMESTAMP,
finished_at TIMESTAMP,
trigger_type VARCHAR(20),
trigger_payload TEXT,
context_data TEXT,
error_message TEXT,
error_stack_trace TEXT,
metrics_snapshot TEXT
);

-- Add Run table enhancements
ALTER TABLE run ADD COLUMN exec_module_id UUID;
ALTER TABLE run ADD COLUMN leased_by VARCHAR(255);
ALTER TABLE run ADD COLUMN inputs_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN config_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN heartbeat_at TIMESTAMP;
ALTER TABLE run ADD COLUMN retry_after TIMESTAMP;
ALTER TABLE run ADD COLUMN duration_ms INTEGER;
ALTER TABLE run ADD COLUMN cost_tokens NUMERIC(10,4);

-- Add DLQ table
CREATE TABLE dead_letter_queue (
id UUID PRIMARY KEY,
run_id UUID REFERENCES run(id),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
failure_reason TEXT,
failure_type VARCHAR(50),
original_inputs TEXT,
original_config TEXT,
quarantined_at TIMESTAMP,
reviewed_by UUID REFERENCES principal(id),
resolution VARCHAR(20),
resolved_at TIMESTAMP,
notes TEXT
);

-- Add indexes
CREATE INDEX idx_run_idempotency ON run(idempotency_key);
CREATE INDEX idx_run_execution ON run(execution_id);
CREATE INDEX idx_dlq_resolution ON dead_letter_queue(resolution);

8.2 Configuration Updates

application.yml additions:

valkyrai:
runner:
id: ${RUNNER_ID:#{T(java.util.UUID).randomUUID().toString()}}
concurrency: ${RUNNER_CONCURRENCY:10}
lease:
duration: PT2M
poll:
interval: PT5S
max:
attempts: 3

observability:
otel:
enabled: true
endpoint: ${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
service:
name: valkyrai-workflow
version: 2.0

Implementation Checklist

Phase 1: Data Models ✅

  • Add WorkflowExecution schema to api.hbs.yaml
  • Enhance Run schema with lease/idempotency fields
  • Add DeadLetterQueue schema
  • Add CircuitBreakerState schema
  • Regenerate models via ThorAPI (mvn -pl thorapi clean install)

Phase 2: API Endpoints ✅

  • Add WorkflowExecution CRUD endpoints
  • Add execution control endpoints (cancel/pause/resume)
  • Add Run endpoints (list/heartbeat)
  • Add DLQ endpoints (list/requeue/discard)
  • Regenerate controllers via ThorAPI

Phase 3: Service Layer ✅

  • Implement WorkflowExecutionService
  • Implement RunService with lease logic
  • Implement DLQService
  • Add idempotency key computation
  • Add exponential backoff logic

Phase 4: Runner Pool ✅

  • Implement RunnerService with polling
  • Add heartbeat mechanism
  • Add crash recovery (zombie reaper)
  • Implement resource limits (thread pool)
  • Add circuit breaker pattern

Phase 5: Controllers ✅

  • Implement WorkflowExecutionController
  • Implement DLQController
  • Add security annotations
  • Wire ACL checks

Phase 6: Observability ✅

  • Add OpenTelemetry dependency
  • Implement tracing aspect
  • Add metrics collection
  • Configure OTLP exporter

Phase 7: Frontend ✅

  • Generate RTKQ hooks for new endpoints
  • Implement WorkflowExecutionMonitor component
  • Implement DLQBrowser component
  • Add flamechart visualization
  • Update WorkflowStudio to use executions

Phase 8: Integration ✅

  • Wrap existing ValkyrWorkflowService with execution tracking
  • Add database migrations
  • Update configuration files
  • Add backward compatibility layer
  • Write integration tests

Phase 9: Testing & Documentation ✅

  • Write unit tests for all services
  • Write integration tests for execution flow
  • Test idempotency guarantees
  • Test crash recovery
  • Update docs with new APIs
  • Create runbooks for DLQ management

Success Metrics

After implementation, validate:

  1. Deterministic Replay: Same inputs → same outputs (hash match)
  2. Crash Recovery: Kill runner mid-step → run resumes without duplicate effects
  3. Idempotency: Duplicate run requests → deduplicated via idempotency key
  4. DLQ Tooling: Failed runs quarantined + requeue works
  5. Observability: Traces link to runs; flamechart shows execution timeline
  6. Performance: P95 dispatch ≤ 75ms; lease acquisition ≤ 10ms

Next Steps

  1. Review this plan with the team
  2. Create feature branch: feature/workflow-engine-v2
  3. Start with Phase 1 (ThorAPI schema updates)
  4. Incremental PRs for each phase
  5. Beta deployment with selected workflows
  6. GA release after dogfooding + chaos tests

Questions & Decisions

  • Runner isolation: Thread pool vs. Docker containers? → Start with thread pool, containerize in Phase 3
  • State store: Postgres vs. Redis for leases? → Postgres for now, add Redis optional in Phase 2
  • Observability backend: Jaeger vs. Grafana Tempo? → Either works; configure via OTLP
  • DLQ retention: How long to keep? → 30 days default, configurable

End of Implementation Plan