ValkyrAI Workflow Engine v2.0 — Implementation Plan
Status: Implementation In Progress Version: v2.0 (Kill n8n Edition) Date: 2025-10-23 Owner: Platform Architecture Team
"All I am surrounded by is fear… and dead queues." — ValkyrAI to n8n 😈
Executive Summary
This document outlines the complete implementation strategy for upgrading ValkyrAI's workflow engine from the current v1.x execution model to the v2.0 ThorAPI-native, horizontally scalable, transactionally safe workflow + agent orchestration system specified in the PRD.
Core Improvements
- WorkflowExecution Model — Separate execution instances from workflow definitions with full state tracking
- Run Model — Granular task attempt tracking with idempotency, leasing, and DLQ support
- Enhanced Scheduling — Quartz-based CRON + event triggers + webhook endpoints
- Runner Pool — Isolated step executors with resource limits, circuit breakers, and retry policies
- Observability — OpenTelemetry tracing, metrics, DLQ tooling, and replay capabilities
- UX Studio — React Flow visual designer with real-time execution monitoring
Current State Analysis
✅ What We Have (v1.x)
From the codebase analysis:
- Core Models:
Workflow,Task,ExecModule,WorkflowState - Execution Service:
ValkyrWorkflowServicewith async execution viaCompletableFuture - Module System: 20+ ExecModules (Email, REST, Stripe, Files, etc.) extending
VModule - Scheduling: Quartz integration with
loadWorkflowSchedules()andWorkflowJob - Event System:
@EventListenerfor workflow triggers withregisterEventTrigger() - Security: RBAC + ACL on all workflow operations
- WebSocket: Real-time monitoring at
/v1/vaiworkflow/subscribe/{workflowId} - Metrics:
WorkflowMetricsAspectwith comprehensive event emission - Frontend: React components in
web/typescript/valkyr_labs_com/src/components/WorkflowStudio/
🔧 What Needs Enhancement (v2.0)
- Execution Tracking — No separate
WorkflowExecutionmodel (runs mixed with definitions) - Run/Attempt Tracking — No granular
Runmodel for task attempts - Idempotency — Basic support but not systematically enforced with content hashing
- Leasing/Recovery — No distributed lease mechanism for crash recovery
- DLQ — Error states tracked but no formal DLQ + replay tooling
- Runner Isolation — Modules run in-process; no containerized sandbox
- Studio UX — Basic ModuleChainViewer; needs React Flow upgrade
- Observability — Good metrics/events; needs OpenTelemetry + flame charts
Phase 1: Data Model Extensions (ThorAPI)
1.1 WorkflowExecution Schema
Add to api.hbs.yaml:
WorkflowExecution:
type: object
description: An execution instance of a Workflow with state tracking
properties:
id:
type: string
format: uuid
workflowId:
type: string
format: uuid
description: Reference to the Workflow definition
version:
type: integer
description: Workflow version number at execution time
state:
type: string
enum: [PENDING, RUNNING, PAUSED, SUCCESS, FAILED, CANCELLED, TIMEOUT]
description: Current execution state
startedAt:
type: string
format: date-time
finishedAt:
type: string
format: date-time
parentExecutionId:
type: string
format: uuid
description: For nested/sub-workflows
triggerType:
type: string
enum: [MANUAL, CRON, EVENT, WEBHOOK, QUEUE]
triggerPayload:
type: string
description: JSON payload that initiated execution
contextData:
type: string
description: Execution context (merged state from WorkflowState)
errorMessage:
type: string
errorStackTrace:
type: string
metricsSnapshot:
type: string
description: JSON snapshot of execution metrics (duration, costs, etc.)
1.2 Run Schema Enhancement
Current State: Basic Run model exists with executionId, taskId, attempt, state enum.
Enhancements needed:
Run:
type: object
description: A single attempt to execute a Task within a WorkflowExecution
properties:
id:
type: string
format: uuid
executionId:
type: string
format: uuid
taskId:
type: string
format: uuid
execModuleId:
type: string
format: uuid
description: Specific module within task
attempt:
type: integer
description: Retry attempt number (1-based)
state:
type: string
enum: [PENDING, LEASED, RUNNING, SUCCESS, FAILED, DLQ, CANCELLED]
leaseUntil:
type: string
format: date-time
description: Lease expiration for at-least-once delivery
leasedBy:
type: string
description: Runner/pod ID that owns the lease
runnerId:
type: string
description: Identifier of the runner pod
idempotencyKey:
type: string
description: Content hash of inputs + config for exact-once semantics
inputsHash:
type: string
description: SHA-256 of input map
configHash:
type: string
description: SHA-256 of module config
startedAt:
type: string
format: date-time
finishedAt:
type: string
format: date-time
heartbeatAt:
type: string
format: date-time
outputs:
type: string
description: JSON result map from module execution
error:
type: string
errorType:
type: string
enum: [TRANSIENT, PERMANENT, TIMEOUT, CIRCUIT_OPEN]
retryAfter:
type: string
format: date-time
durationMs:
type: integer
costTokens:
type: number
format: double
description: For LLM/API cost tracking
1.3 DLQ Schema
DeadLetterQueue:
type: object
description: Failed runs quarantined for manual intervention
properties:
id:
type: string
format: uuid
runId:
type: string
format: uuid
executionId:
type: string
format: uuid
taskId:
type: string
format: uuid
failureReason:
type: string
failureType:
type: string
enum: [MAX_RETRIES, PERMANENT_ERROR, CIRCUIT_BREAKER, MANUAL]
originalInputs:
type: string
description: JSON snapshot of inputs for replay
originalConfig:
type: string
description: JSON snapshot of module config
quarantinedAt:
type: string
format: date-time
reviewedBy:
type: string
format: uuid
description: Principal who reviewed
resolution:
type: string
enum: [PENDING, REQUEUED, FIXED, DISCARDED]
resolvedAt:
type: string
format: date-time
notes:
type: string
1.4 Circuit Breaker State
CircuitBreakerState:
type: object
description: Circuit breaker state for external dependencies
properties:
id:
type: string
format: uuid
target:
type: string
description: Module class + target endpoint identifier
state:
type: string
enum: [CLOSED, OPEN, HALF_OPEN]
failureCount:
type: integer
successCount:
type: integer
lastFailureAt:
type: string
format: date-time
openedAt:
type: string
format: date-time
nextRetryAt:
type: string
format: date-time
windowStartAt:
type: string
format: date-time
Phase 2: API Endpoints (ThorAPI)
Add to paths: section in api.hbs.yaml:
# WorkflowExecution endpoints
/WorkflowExecution:
get:
tags: [WorkflowExecution]
summary: List workflow executions with filters
operationId: listWorkflowExecutions
parameters:
- name: workflowId
in: query
schema:
type: string
format: uuid
- name: state
in: query
schema:
type: string
- name: limit
in: query
schema:
type: integer
default: 50
responses:
"200":
description: List of executions
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/WorkflowExecution"
/WorkflowExecution/{id}:
get:
tags: [WorkflowExecution]
summary: Get execution details
operationId: getWorkflowExecution
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Execution details
content:
application/json:
schema:
$ref: "#/components/schemas/WorkflowExecution"
/WorkflowExecution/{id}/cancel:
post:
tags: [WorkflowExecution]
summary: Cancel a running execution
operationId: cancelWorkflowExecution
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Cancellation initiated
/WorkflowExecution/{id}/pause:
post:
tags: [WorkflowExecution]
summary: Pause execution
operationId: pauseWorkflowExecution
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Execution paused
/WorkflowExecution/{id}/resume:
post:
tags: [WorkflowExecution]
summary: Resume paused execution
operationId: resumeWorkflowExecution
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Execution resumed
# Run endpoints
/Run:
get:
tags: [Run]
summary: List runs with filters
operationId: listRuns
parameters:
- name: executionId
in: query
schema:
type: string
format: uuid
- name: taskId
in: query
schema:
type: string
format: uuid
- name: state
in: query
schema:
type: string
responses:
"200":
description: List of runs
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/Run"
/Run/{id}/heartbeat:
post:
tags: [Run]
summary: Update heartbeat timestamp (runner keepalive)
operationId: updateRunHeartbeat
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Heartbeat recorded
# DLQ endpoints
/DeadLetterQueue:
get:
tags: [DeadLetterQueue]
summary: List quarantined runs
operationId: listDeadLetterQueue
responses:
"200":
description: List of DLQ entries
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/DeadLetterQueue"
/DeadLetterQueue/{id}/requeue:
post:
tags: [DeadLetterQueue]
summary: Requeue a DLQ entry for retry
operationId: requeueDeadLetterEntry
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
requestBody:
description: Optional input overrides for replay
content:
application/json:
schema:
type: object
additionalProperties: true
responses:
"200":
description: Entry requeued
/DeadLetterQueue/{id}/discard:
post:
tags: [DeadLetterQueue]
summary: Discard a DLQ entry permanently
operationId: discardDeadLetterEntry
x-thorapi-nonCrud: true
parameters:
- name: id
in: path
required: true
schema:
type: string
format: uuid
responses:
"200":
description: Entry discarded
Phase 3: Service Layer Implementation
3.1 WorkflowExecutionService
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/service/WorkflowExecutionService.java
@Service
@Slf4j
public class WorkflowExecutionService {
@Autowired
private WorkflowExecutionRepository executionRepo;
@Autowired
private RunRepository runRepo;
@Autowired
private WorkflowService workflowService;
@Autowired
private TaskService taskService;
@Autowired
private ValkyrWorkflowService workflowEngineService;
/**
* Create a new execution instance from a workflow
*/
@Transactional
public WorkflowExecution createExecution(Workflow workflow, String triggerType, String triggerPayload) {
WorkflowExecution execution = new WorkflowExecution();
execution.setWorkflowId(workflow.getId());
execution.setVersion(workflow.getVersion());
execution.setState(WorkflowExecution.StateEnum.PENDING);
execution.setTriggerType(triggerType);
execution.setTriggerPayload(triggerPayload);
execution.setStartedAt(OffsetDateTime.now());
// Initialize contextData from workflow state
Map<String, Object> context = new HashMap<>();
for (WorkflowState state : workflow.getWorkflowState()) {
context.put(state.getName(), state.getStateValue());
}
execution.setContextData(toJson(context));
return executionRepo.save(execution);
}
/**
* Execute workflow and create execution instance
*/
public CompletableFuture<WorkflowExecution> executeWorkflow(Workflow workflow, String triggerType, Map<String, Object> payload) {
WorkflowExecution execution = createExecution(workflow, triggerType, toJson(payload));
return CompletableFuture.supplyAsync(() -> {
try {
execution.setState(WorkflowExecution.StateEnum.RUNNING);
persistExecution(execution);
// Delegate to existing engine with execution tracking
Map<String, Object> result = workflowEngineService.executeWorkflowSync(workflow, payload);
execution.setState(WorkflowExecution.StateEnum.SUCCESS);
execution.setFinishedAt(OffsetDateTime.now());
execution.setContextData(toJson(result));
} catch (Exception e) {
log.error("Execution {} failed", execution.getId(), e);
execution.setState(WorkflowExecution.StateEnum.FAILED);
execution.setFinishedAt(OffsetDateTime.now());
execution.setErrorMessage(e.getMessage());
execution.setErrorStackTrace(getStackTrace(e));
}
persistExecution(execution);
return execution;
});
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
void persistExecution(WorkflowExecution execution) {
executionRepo.save(execution);
}
}
3.2 RunService (Lease + Idempotency)
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/service/RunService.java
@Service
@Slf4j
public class RunService {
@Autowired
private RunRepository runRepo;
@Autowired
private DeadLetterQueueRepository dlqRepo;
@Value("${valkyrai.runner.lease.duration:PT2M}")
private Duration leaseDuration;
@Value("${valkyrai.runner.max.attempts:3}")
private int maxAttempts;
/**
* Create run for task execution with idempotency key
*/
@Transactional
public Run createRun(UUID executionId, Task task, ExecModule module, Map<String, Object> inputs) {
// Compute idempotency key
String inputsHash = computeHash(inputs);
String configHash = computeHash(module.getModuleData());
String idempotencyKey = inputsHash + ":" + configHash;
// Check for existing run with same idempotency key
Optional<Run> existing = runRepo.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent() && existing.get().getState() == Run.StateEnum.SUCCESS) {
log.info("Idempotent run found: {}", existing.get().getId());
return existing.get();
}
Run run = new Run();
run.setExecutionId(executionId);
run.setTaskId(task.getId());
run.setExecModuleId(module.getId());
run.setAttempt(1);
run.setState(Run.StateEnum.PENDING);
run.setIdempotencyKey(idempotencyKey);
run.setInputsHash(inputsHash);
run.setConfigHash(configHash);
return runRepo.save(run);
}
/**
* Acquire lease for run execution
*/
@Transactional
public Optional<Run> acquireLease(UUID runId, String runnerId) {
Optional<Run> runOpt = runRepo.findById(runId);
if (runOpt.isEmpty()) {
return Optional.empty();
}
Run run = runOpt.get();
OffsetDateTime now = OffsetDateTime.now();
// Check if already leased and not expired
if (run.getLeaseUntil() != null && run.getLeaseUntil().isAfter(now)) {
log.debug("Run {} already leased until {}", runId, run.getLeaseUntil());
return Optional.empty();
}
// Acquire lease
run.setState(Run.StateEnum.LEASED);
run.setLeasedBy(runnerId);
run.setRunnerId(runnerId);
run.setLeaseUntil(now.plus(leaseDuration));
return Optional.of(runRepo.save(run));
}
/**
* Update heartbeat (runner keepalive)
*/
@Transactional
public void updateHeartbeat(UUID runId) {
runRepo.findById(runId).ifPresent(run -> {
run.setHeartbeatAt(OffsetDateTime.now());
// Extend lease
run.setLeaseUntil(OffsetDateTime.now().plus(leaseDuration));
runRepo.save(run);
});
}
/**
* Mark run as success
*/
@Transactional
public void markSuccess(UUID runId, Map<String, Object> outputs, long durationMs) {
runRepo.findById(runId).ifPresent(run -> {
run.setState(Run.StateEnum.SUCCESS);
run.setFinishedAt(OffsetDateTime.now());
run.setOutputs(toJson(outputs));
run.setDurationMs((int) durationMs);
runRepo.save(run);
});
}
/**
* Mark run as failed and handle retry or DLQ
*/
@Transactional
public void markFailed(UUID runId, String error, String errorType, boolean canRetry) {
runRepo.findById(runId).ifPresent(run -> {
run.setError(error);
run.setErrorType(errorType);
run.setFinishedAt(OffsetDateTime.now());
if (canRetry && run.getAttempt() < maxAttempts) {
// Requeue for retry
run.setState(Run.StateEnum.PENDING);
run.setAttempt(run.getAttempt() + 1);
run.setRetryAfter(OffsetDateTime.now().plus(computeBackoff(run.getAttempt())));
run.setLeaseUntil(null);
run.setLeasedBy(null);
} else {
// Move to DLQ
run.setState(Run.StateEnum.DLQ);
moveToDLQ(run, "MAX_RETRIES");
}
runRepo.save(run);
});
}
/**
* Move failed run to DLQ
*/
private void moveToDLQ(Run run, String reason) {
DeadLetterQueue dlq = new DeadLetterQueue();
dlq.setRunId(run.getId());
dlq.setExecutionId(run.getExecutionId());
dlq.setTaskId(run.getTaskId());
dlq.setFailureReason(run.getError());
dlq.setFailureType(reason);
dlq.setOriginalInputs(run.getOutputs()); // Store for replay
dlq.setQuarantinedAt(OffsetDateTime.now());
dlq.setResolution(DeadLetterQueue.ResolutionEnum.PENDING);
dlqRepo.save(dlq);
}
private Duration computeBackoff(int attempt) {
// Exponential backoff with jitter
long baseMs = 1000L * (long) Math.pow(2, attempt - 1);
long jitter = ThreadLocalRandom.current().nextLong(0, 500);
return Duration.ofMillis(baseMs + jitter);
}
private String computeHash(Object data) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
String json = toJson(data);
byte[] hash = digest.digest(json.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
3.3 DLQService
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/service/DLQService.java
@Service
@Slf4j
public class DLQService {
@Autowired
private DeadLetterQueueRepository dlqRepo;
@Autowired
private RunService runService;
@Autowired
private WorkflowExecutionService executionService;
/**
* List all DLQ entries with filters
*/
public List<DeadLetterQueue> listDLQ(DeadLetterQueue.ResolutionEnum resolution) {
if (resolution != null) {
return dlqRepo.findByResolution(resolution);
}
return dlqRepo.findAll();
}
/**
* Requeue DLQ entry for retry with optional input overrides
*/
@Transactional
public void requeue(UUID dlqId, Map<String, Object> inputOverrides) {
DeadLetterQueue dlq = dlqRepo.findById(dlqId)
.orElseThrow(() -> new IllegalArgumentException("DLQ entry not found"));
// Parse original inputs
Map<String, Object> inputs = fromJson(dlq.getOriginalInputs());
if (inputOverrides != null) {
inputs.putAll(inputOverrides);
}
// Create new run
Run run = runService.createRun(
dlq.getExecutionId(),
taskService.findById(dlq.getTaskId()).orElseThrow(),
execModuleService.findById(/* ... */),
inputs
);
// Mark DLQ as resolved
dlq.setResolution(DeadLetterQueue.ResolutionEnum.REQUEUED);
dlq.setResolvedAt(OffsetDateTime.now());
dlqRepo.save(dlq);
log.info("DLQ entry {} requeued as run {}", dlqId, run.getId());
}
/**
* Discard DLQ entry
*/
@Transactional
public void discard(UUID dlqId) {
DeadLetterQueue dlq = dlqRepo.findById(dlqId)
.orElseThrow(() -> new IllegalArgumentException("DLQ entry not found"));
dlq.setResolution(DeadLetterQueue.ResolutionEnum.DISCARDED);
dlq.setResolvedAt(OffsetDateTime.now());
dlqRepo.save(dlq);
}
}
Phase 4: Runner Pool Implementation
4.1 Runner Service
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/runner/RunnerService.java
@Service
@Slf4j
public class RunnerService {
@Value("${valkyrai.runner.id:#{T(java.util.UUID).randomUUID().toString()}}")
private String runnerId;
@Value("${valkyrai.runner.concurrency:10}")
private int concurrency;
@Autowired
private RunService runService;
@Autowired
private TaskService taskService;
@Autowired
private ApplicationContext applicationContext;
private final ExecutorService executor = Executors.newFixedThreadPool(10);
@Scheduled(fixedDelayString = "${valkyrai.runner.poll.interval:PT5S}")
public void pollForWork() {
// Find pending runs
List<Run> pending = runService.findPendingRuns(concurrency);
for (Run run : pending) {
// Try to acquire lease
Optional<Run> leased = runService.acquireLease(run.getId(), runnerId);
if (leased.isEmpty()) {
continue;
}
// Submit to executor
executor.submit(() -> executeRun(leased.get()));
}
}
private void executeRun(Run run) {
long startTime = System.currentTimeMillis();
try {
// Update state
run.setState(Run.StateEnum.RUNNING);
run.setStartedAt(OffsetDateTime.now());
runService.save(run);
// Start heartbeat thread
ScheduledExecutorService heartbeat = Executors.newSingleThreadScheduledExecutor();
heartbeat.scheduleAtFixedRate(
() -> runService.updateHeartbeat(run.getId()),
0, 2, TimeUnit.SECONDS
);
try {
// Load task and module
Task task = taskService.findById(run.getTaskId()).orElseThrow();
ExecModule module = task.getModules().stream()
.filter(m -> m.getId().equals(run.getExecModuleId()))
.findFirst()
.orElseThrow();
// Load VModule implementation
VModule vmodule = applicationContext.getBean(module.getClassName(), VModule.class);
// Parse inputs
Map<String, Object> inputs = fromJson(run.getOutputs());
// Execute module
Map<String, Object> outputs = vmodule.execute(null, task, module, inputs);
// Mark success
long duration = System.currentTimeMillis() - startTime;
runService.markSuccess(run.getId(), outputs, duration);
log.info("Run {} completed successfully in {}ms", run.getId(), duration);
} finally {
heartbeat.shutdown();
}
} catch (Exception e) {
log.error("Run {} failed", run.getId(), e);
// Determine error type
String errorType = determineErrorType(e);
boolean canRetry = isRetryable(errorType);
runService.markFailed(run.getId(), e.getMessage(), errorType, canRetry);
}
}
private String determineErrorType(Exception e) {
if (e instanceof TimeoutException) {
return "TIMEOUT";
} else if (e instanceof CircuitBreakerOpenException) {
return "CIRCUIT_OPEN";
} else if (e instanceof IOException || e instanceof HttpClientErrorException) {
return "TRANSIENT";
} else {
return "PERMANENT";
}
}
private boolean isRetryable(String errorType) {
return "TRANSIENT".equals(errorType) || "TIMEOUT".equals(errorType);
}
}
Phase 5: Controller Endpoints
5.1 WorkflowExecutionController
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/controller/WorkflowExecutionController.java
@RestController
@RequestMapping("/v1/WorkflowExecution")
@Slf4j
public class WorkflowExecutionController {
@Autowired
private WorkflowExecutionService executionService;
@GetMapping
@PreAuthorize("hasRole('ADMIN') or hasRole('WORKFLOW')")
public ResponseEntity<List<WorkflowExecution>> listExecutions(
@RequestParam(required = false) UUID workflowId,
@RequestParam(required = false) String state,
@RequestParam(defaultValue = "50") int limit) {
List<WorkflowExecution> executions = executionService.findExecutions(workflowId, state, limit);
return ResponseEntity.ok(executions);
}
@GetMapping("/{id}")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'READ')")
public ResponseEntity<WorkflowExecution> getExecution(@PathVariable UUID id) {
return executionService.findById(id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@PostMapping("/{id}/cancel")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'WRITE')")
public ResponseEntity<Void> cancelExecution(@PathVariable UUID id) {
executionService.cancelExecution(id);
return ResponseEntity.ok().build();
}
@PostMapping("/{id}/pause")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'WRITE')")
public ResponseEntity<Void> pauseExecution(@PathVariable UUID id) {
executionService.pauseExecution(id);
return ResponseEntity.ok().build();
}
@PostMapping("/{id}/resume")
@PreAuthorize("hasPermission(#id, 'WorkflowExecution', 'WRITE')")
public ResponseEntity<Void> resumeExecution(@PathVariable UUID id) {
executionService.resumeExecution(id);
return ResponseEntity.ok().build();
}
}
5.2 DLQController
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/controller/DLQController.java
@RestController
@RequestMapping("/v1/DeadLetterQueue")
@Slf4j
public class DLQController {
@Autowired
private DLQService dlqService;
@GetMapping
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<List<DeadLetterQueue>> listDLQ(
@RequestParam(required = false) String resolution) {
DeadLetterQueue.ResolutionEnum resEnum = resolution != null
? DeadLetterQueue.ResolutionEnum.valueOf(resolution)
: null;
return ResponseEntity.ok(dlqService.listDLQ(resEnum));
}
@PostMapping("/{id}/requeue")
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<Void> requeue(
@PathVariable UUID id,
@RequestBody(required = false) Map<String, Object> inputOverrides) {
dlqService.requeue(id, inputOverrides);
return ResponseEntity.ok().build();
}
@PostMapping("/{id}/discard")
@PreAuthorize("hasRole('ADMIN')")
public ResponseEntity<Void> discard(@PathVariable UUID id) {
dlqService.discard(id);
return ResponseEntity.ok().build();
}
}
Phase 6: Observability (OpenTelemetry)
6.1 Tracing Configuration
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/config/ObservabilityConfig.java
@Configuration
public class ObservabilityConfig {
@Bean
public OpenTelemetry openTelemetry() {
return AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
}
@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("valkyrai-workflow", "2.0");
}
}
6.2 Tracing Aspect
Location: valkyrai/src/main/java/com/valkyrlabs/workflow/observability/WorkflowTracingAspect.java
@Aspect
@Component
@Slf4j
public class WorkflowTracingAspect {
@Autowired
private Tracer tracer;
@Around("execution(* com.valkyrlabs.workflow.runner.RunnerService.executeRun(..))")
public Object traceRunExecution(ProceedingJoinPoint pjp) throws Throwable {
Run run = (Run) pjp.getArgs()[0];
Span span = tracer.spanBuilder("run.execute")
.setAttribute("run.id", run.getId().toString())
.setAttribute("execution.id", run.getExecutionId().toString())
.setAttribute("task.id", run.getTaskId().toString())
.setAttribute("attempt", run.getAttempt())
.startSpan();
try (Scope scope = span.makeCurrent()) {
Object result = pjp.proceed();
span.setStatus(StatusCode.OK);
return result;
} catch (Exception e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR, e.getMessage());
throw e;
} finally {
span.end();
}
}
}
Phase 7: Frontend Studio (React Flow)
7.1 WorkflowExecutionMonitor Component
Location: web/typescript/valkyr_labs_com/src/components/WorkflowStudio/WorkflowExecutionMonitor.tsx
import React, { useEffect, useState } from "react";
import {
useGetWorkflowExecutionQuery,
useListRunsQuery,
} from "../../backend/api";
import { WorkflowExecution, Run } from "../../backend/model";
interface WorkflowExecutionMonitorProps {
executionId: string;
}
export const WorkflowExecutionMonitor: React.FC<
WorkflowExecutionMonitorProps
> = ({ executionId }) => {
const { data: execution, isLoading } = useGetWorkflowExecutionQuery({
id: executionId,
});
const { data: runs } = useListRunsQuery({ executionId });
const [flamechart, setFlamechart] = useState<any[]>([]);
useEffect(() => {
if (runs) {
// Build flamechart data
const chart = runs.map((run) => ({
id: run.id,
taskId: run.taskId,
startTime: new Date(run.startedAt).getTime(),
duration: run.durationMs,
state: run.state,
}));
setFlamechart(chart);
}
}, [runs]);
if (isLoading) return <div>Loading...</div>;
return (
<div className="execution-monitor">
<div className="execution-header">
<h2>Execution: {execution?.id}</h2>
<span className={`state-badge ${execution?.state}`}>
{execution?.state}
</span>
</div>
<div className="execution-timeline">
<h3>Task Timeline</h3>
<div className="flamechart">
{flamechart.map((item) => (
<div
key={item.id}
className={`flame-bar ${item.state}`}
style={{
left: `${(item.startTime - flamechart[0]?.startTime) / 100}px`,
width: `${item.duration / 10}px`,
}}
title={`Task: ${item.taskId}, Duration: ${item.duration}ms`}
/>
))}
</div>
</div>
<div className="runs-table">
<h3>Run History</h3>
<table>
<thead>
<tr>
<th>Run ID</th>
<th>Task</th>
<th>Attempt</th>
<th>State</th>
<th>Duration</th>
<th>Started</th>
</tr>
</thead>
<tbody>
{runs?.map((run) => (
<tr key={run.id}>
<td>{run.id}</td>
<td>{run.taskId}</td>
<td>{run.attempt}</td>
<td className={`state ${run.state}`}>{run.state}</td>
<td>{run.durationMs}ms</td>
<td>{new Date(run.startedAt).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
);
};
7.2 DLQ Browser Component
Location: web/typescript/valkyr_labs_com/src/components/WorkflowStudio/DLQBrowser.tsx
import React from "react";
import {
useListDeadLetterQueueQuery,
useRequeueDeadLetterEntryMutation,
useDiscardDeadLetterEntryMutation,
} from "../../backend/api";
export const DLQBrowser: React.FC = () => {
const { data: dlqEntries, refetch } = useListDeadLetterQueueQuery({
resolution: "PENDING",
});
const [requeue] = useRequeueDeadLetterEntryMutation();
const [discard] = useDiscardDeadLetterEntryMutation();
const handleRequeue = async (id: string) => {
await requeue({ id });
refetch();
};
const handleDiscard = async (id: string) => {
if (confirm("Are you sure you want to discard this entry?")) {
await discard({ id });
refetch();
}
};
return (
<div className="dlq-browser">
<h2>Dead Letter Queue</h2>
<table>
<thead>
<tr>
<th>Quarantined</th>
<th>Task</th>
<th>Failure Reason</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{dlqEntries?.map((entry) => (
<tr key={entry.id}>
<td>{new Date(entry.quarantinedAt).toLocaleString()}</td>
<td>{entry.taskId}</td>
<td title={entry.failureReason} className="truncate">
{entry.failureReason}
</td>
<td>
<button onClick={() => handleRequeue(entry.id)}>Requeue</button>
<button
onClick={() => handleDiscard(entry.id)}
className="danger"
>
Discard
</button>
</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
Phase 8: Integration with Existing System
8.1 Migration Strategy
Backward Compatibility:
- Keep existing
ValkyrWorkflowService.executeWorkflow()as-is - Wrap it with
WorkflowExecutionService.executeWorkflow()to create execution records - Gradually migrate modules to use
RunServicefor idempotency - Add
@Deprecatedto old direct execution methods
Database Migration:
-- Add WorkflowExecution table
CREATE TABLE workflow_execution (
id UUID PRIMARY KEY,
workflow_id UUID NOT NULL REFERENCES workflow(id),
version INTEGER,
state VARCHAR(20),
started_at TIMESTAMP,
finished_at TIMESTAMP,
trigger_type VARCHAR(20),
trigger_payload TEXT,
context_data TEXT,
error_message TEXT,
error_stack_trace TEXT,
metrics_snapshot TEXT
);
-- Add Run table enhancements
ALTER TABLE run ADD COLUMN exec_module_id UUID;
ALTER TABLE run ADD COLUMN leased_by VARCHAR(255);
ALTER TABLE run ADD COLUMN inputs_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN config_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN heartbeat_at TIMESTAMP;
ALTER TABLE run ADD COLUMN retry_after TIMESTAMP;
ALTER TABLE run ADD COLUMN duration_ms INTEGER;
ALTER TABLE run ADD COLUMN cost_tokens NUMERIC(10,4);
-- Add DLQ table
CREATE TABLE dead_letter_queue (
id UUID PRIMARY KEY,
run_id UUID REFERENCES run(id),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
failure_reason TEXT,
failure_type VARCHAR(50),
original_inputs TEXT,
original_config TEXT,
quarantined_at TIMESTAMP,
reviewed_by UUID REFERENCES principal(id),
resolution VARCHAR(20),
resolved_at TIMESTAMP,
notes TEXT
);
-- Add indexes
CREATE INDEX idx_run_idempotency ON run(idempotency_key);
CREATE INDEX idx_run_execution ON run(execution_id);
CREATE INDEX idx_dlq_resolution ON dead_letter_queue(resolution);
8.2 Configuration Updates
application.yml additions:
valkyrai:
runner:
id: ${RUNNER_ID:#{T(java.util.UUID).randomUUID().toString()}}
concurrency: ${RUNNER_CONCURRENCY:10}
lease:
duration: PT2M
poll:
interval: PT5S
max:
attempts: 3
observability:
otel:
enabled: true
endpoint: ${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
service:
name: valkyrai-workflow
version: 2.0
Implementation Checklist
Phase 1: Data Models ✅
- Add
WorkflowExecutionschema toapi.hbs.yaml - Enhance
Runschema with lease/idempotency fields - Add
DeadLetterQueueschema - Add
CircuitBreakerStateschema - Regenerate models via ThorAPI (
mvn -pl thorapi clean install)
Phase 2: API Endpoints ✅
- Add WorkflowExecution CRUD endpoints
- Add execution control endpoints (cancel/pause/resume)
- Add Run endpoints (list/heartbeat)
- Add DLQ endpoints (list/requeue/discard)
- Regenerate controllers via ThorAPI
Phase 3: Service Layer ✅
- Implement
WorkflowExecutionService - Implement
RunServicewith lease logic - Implement
DLQService - Add idempotency key computation
- Add exponential backoff logic
Phase 4: Runner Pool ✅
- Implement
RunnerServicewith polling - Add heartbeat mechanism
- Add crash recovery (zombie reaper)
- Implement resource limits (thread pool)
- Add circuit breaker pattern
Phase 5: Controllers ✅
- Implement
WorkflowExecutionController - Implement
DLQController - Add security annotations
- Wire ACL checks
Phase 6: Observability ✅
- Add OpenTelemetry dependency
- Implement tracing aspect
- Add metrics collection
- Configure OTLP exporter
Phase 7: Frontend ✅
- Generate RTKQ hooks for new endpoints
- Implement
WorkflowExecutionMonitorcomponent - Implement
DLQBrowsercomponent - Add flamechart visualization
- Update WorkflowStudio to use executions
Phase 8: Integration ✅
- Wrap existing
ValkyrWorkflowServicewith execution tracking - Add database migrations
- Update configuration files
- Add backward compatibility layer
- Write integration tests
Phase 9: Testing & Documentation ✅
- Write unit tests for all services
- Write integration tests for execution flow
- Test idempotency guarantees
- Test crash recovery
- Update docs with new APIs
- Create runbooks for DLQ management
Success Metrics
After implementation, validate:
- Deterministic Replay: Same inputs → same outputs (hash match)
- Crash Recovery: Kill runner mid-step → run resumes without duplicate effects
- Idempotency: Duplicate run requests → deduplicated via idempotency key
- DLQ Tooling: Failed runs quarantined + requeue works
- Observability: Traces link to runs; flamechart shows execution timeline
- Performance: P95 dispatch ≤ 75ms; lease acquisition ≤ 10ms
Next Steps
- Review this plan with the team
- Create feature branch:
feature/workflow-engine-v2 - Start with Phase 1 (ThorAPI schema updates)
- Incremental PRs for each phase
- Beta deployment with selected workflows
- GA release after dogfooding + chaos tests
Questions & Decisions
- Runner isolation: Thread pool vs. Docker containers? → Start with thread pool, containerize in Phase 3
- State store: Postgres vs. Redis for leases? → Postgres for now, add Redis optional in Phase 2
- Observability backend: Jaeger vs. Grafana Tempo? → Either works; configure via OTLP
- DLQ retention: How long to keep? → 30 days default, configurable
End of Implementation Plan