ValkyrAI Workflow Engine v2.0 — Migration Guide
Migrating from v1.x (current) to v2.0 (kill-n8n edition)
Overview
This guide helps you migrate existing workflows and code from the current v1.x execution model to the new v2.0 system with:
- Separate execution tracking (WorkflowExecution)
- Idempotent runs with crash recovery (Run)
- Dead letter queue (DLQ)
- Circuit breakers
Backward Compatibility: The old ValkyrWorkflowService.executeWorkflow() API still works! We're wrapping it, not replacing it.
Migration Strategy
Phase 1: Add New Infrastructure (Non-Breaking)
- Add new models via OpenAPI schema ✅ DONE
- Regenerate code ✅ DONE
- Run database migrations
- Deploy new services alongside existing ones
- No existing workflows break
Phase 2: Gradual Migration
- Wrap existing execution calls with execution tracking
- Enable for new workflows only first
- Gradually migrate existing workflows
- Monitor metrics and DLQ
Phase 3: Deprecate Old API
- Mark old direct execution methods as
@Deprecated - Migrate all workflows to new API
- Remove deprecated methods in v3.0
Database Migration
Step 1: Create Migration File
File: valkyrai/src/main/resources/db/migration/V2.0__workflow_execution_tracking.sql
-- WorkflowExecution table
CREATE TABLE IF NOT EXISTS workflow_execution (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id),
version INTEGER,
state VARCHAR(20) NOT NULL DEFAULT 'PENDING',
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
parent_execution_id UUID REFERENCES workflow_execution(id),
trigger_type VARCHAR(20),
trigger_payload TEXT,
context_data TEXT,
error_message TEXT,
error_stack_trace TEXT,
metrics_snapshot TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);
CREATE INDEX idx_workflow_execution_workflow_id ON workflow_execution(workflow_id);
CREATE INDEX idx_workflow_execution_state ON workflow_execution(state);
CREATE INDEX idx_workflow_execution_started_at ON workflow_execution(started_at DESC);
-- Enhance existing Run table (or create if doesn't exist)
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'run') THEN
CREATE TABLE run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
attempt INTEGER DEFAULT 1,
state VARCHAR(20) DEFAULT 'PENDING',
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
END IF;
END $$;
-- Add new columns to Run table
ALTER TABLE run ADD COLUMN IF NOT EXISTS exec_module_id UUID;
ALTER TABLE run ADD COLUMN IF NOT EXISTS leased_by VARCHAR(255);
ALTER TABLE run ADD COLUMN IF NOT EXISTS lease_until TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS idempotency_key VARCHAR(128);
ALTER TABLE run ADD COLUMN IF NOT EXISTS inputs_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS config_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS retry_after TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS started_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS finished_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS outputs TEXT;
ALTER TABLE run ADD COLUMN IF NOT EXISTS error TEXT;
ALTER TABLE run ADD COLUMN IF NOT EXISTS error_type VARCHAR(20);
ALTER TABLE run ADD COLUMN IF NOT EXISTS duration_ms INTEGER;
ALTER TABLE run ADD COLUMN IF NOT EXISTS cost_tokens NUMERIC(10,4);
ALTER TABLE run ADD COLUMN IF NOT EXISTS runner_id VARCHAR(255);
CREATE INDEX IF NOT EXISTS idx_run_idempotency ON run(idempotency_key);
CREATE INDEX IF NOT EXISTS idx_run_execution ON run(execution_id);
CREATE INDEX IF NOT EXISTS idx_run_state ON run(state);
CREATE INDEX IF NOT EXISTS idx_run_lease_until ON run(lease_until) WHERE state = 'LEASED';
-- DeadLetterQueue table
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID REFERENCES run(id),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
failure_reason TEXT,
failure_type VARCHAR(50),
original_inputs TEXT,
original_config TEXT,
quarantined_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
reviewed_by UUID REFERENCES principal(id),
resolution VARCHAR(20) DEFAULT 'PENDING',
resolved_at TIMESTAMP WITH TIME ZONE,
notes TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);
CREATE INDEX idx_dlq_resolution ON dead_letter_queue(resolution);
CREATE INDEX idx_dlq_quarantined_at ON dead_letter_queue(quarantined_at DESC);
CREATE INDEX idx_dlq_execution_id ON dead_letter_queue(execution_id);
-- CircuitBreakerState table
CREATE TABLE IF NOT EXISTS circuit_breaker_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
target VARCHAR(500) NOT NULL UNIQUE,
state VARCHAR(20) NOT NULL DEFAULT 'CLOSED',
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
last_failure_at TIMESTAMP WITH TIME ZONE,
opened_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE,
window_start_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);
CREATE INDEX idx_circuit_breaker_target ON circuit_breaker_state(target);
Step 2: Run Migration
# If using Flyway
mvn flyway:migrate
# OR if using Liquibase
mvn liquibase:update
# OR restart application with auto-migration enabled
# (Spring Boot will auto-run Flyway/Liquibase on startup)
Code Migration
Current Code (v1.x)
@Autowired
private ValkyrWorkflowService workflowService;
// Direct execution (no tracking)
CompletableFuture<Workflow> future = workflowService.executeWorkflow(workflow);
New Code (v2.0)
@Autowired
private WorkflowExecutionService executionService;
// Tracked execution with metrics
CompletableFuture<WorkflowExecution> future = executionService.executeWorkflow(
workflow,
"MANUAL", // trigger type
Map.of("customerId", "123", "amount", 49.0) // payload
);
// Get execution ID for monitoring
future.thenAccept(execution -> {
log.info("Execution started: {}", execution.getId());
});
Backward-Compatible Wrapper
File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/WorkflowExecutionService.java
@Service
public class WorkflowExecutionService {
@Autowired
private ValkyrWorkflowService legacyService;
@Autowired
private WorkflowExecutionRepository executionRepo;
/**
* v2.0 API: Execute with full tracking
*/
public CompletableFuture<WorkflowExecution> executeWorkflow(
Workflow workflow,
String triggerType,
Map<String, Object> payload) {
// Create execution record
WorkflowExecution execution = new WorkflowExecution();
execution.setWorkflowId(workflow.getId());
execution.setState(WorkflowExecution.StateEnum.PENDING);
execution.setTriggerType(triggerType);
execution.setTriggerPayload(toJson(payload));
execution = executionRepo.save(execution);
final UUID executionId = execution.getId();
// Delegate to legacy service (backward compatible)
return legacyService.executeWorkflow(workflow)
.thenApply(completedWorkflow -> {
// Update execution record
WorkflowExecution updated = executionRepo.findById(executionId).orElseThrow();
updated.setState(WorkflowExecution.StateEnum.SUCCESS);
updated.setFinishedAt(OffsetDateTime.now());
return executionRepo.save(updated);
})
.exceptionally(ex -> {
// Handle failure
WorkflowExecution failed = executionRepo.findById(executionId).orElseThrow();
failed.setState(WorkflowExecution.StateEnum.FAILED);
failed.setErrorMessage(ex.getMessage());
failed.setFinishedAt(OffsetDateTime.now());
return executionRepo.save(failed);
});
}
/**
* v1.x compatibility: Execute without tracking (deprecated)
* @deprecated Use executeWorkflow(Workflow, String, Map) instead
*/
@Deprecated
public CompletableFuture<Workflow> executeWorkflowLegacy(Workflow workflow) {
return legacyService.executeWorkflow(workflow);
}
}
Controller Migration
Current Controller (v1.x)
@RestController
@RequestMapping("/v1/workflow")
public class WorkflowController {
@PostMapping("/{id}/execute")
public ResponseEntity<Workflow> execute(@PathVariable UUID id) {
Workflow workflow = workflowService.findById(id).orElseThrow();
CompletableFuture<Workflow> future = valkyrWorkflowService.executeWorkflow(workflow);
// ... return workflow
}
}
New Controller (v2.0)
@RestController
@RequestMapping("/v1/workflow")
public class WorkflowController {
@Autowired
private WorkflowExecutionService executionService;
@PostMapping("/{id}/execute")
public ResponseEntity<WorkflowExecution> execute(
@PathVariable UUID id,
@RequestBody Map<String, Object> payload) {
Workflow workflow = workflowService.findById(id).orElseThrow();
CompletableFuture<WorkflowExecution> future = executionService.executeWorkflow(
workflow,
"MANUAL",
payload
);
// Return immediately (async execution)
WorkflowExecution execution = future.getNow(null);
return ResponseEntity.accepted().body(execution);
}
@GetMapping("/execution/{executionId}")
public ResponseEntity<WorkflowExecution> getExecution(@PathVariable UUID executionId) {
return executionService.findById(executionId)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
}
Configuration Migration
Current Config (v1.x)
valkyrai:
workflow:
async: true
pool-size: 10
New Config (v2.0)
valkyrai:
workflow:
async: true
pool-size: 10
# NEW: Execution tracking
execution:
enabled: true
track-metrics: true
# NEW: Runner pool configuration
runner:
id: ${RUNNER_ID:#{T(java.util.UUID).randomUUID().toString()}}
concurrency: 10
poll-interval: PT5S
lease:
duration: PT2M
heartbeat-interval: PT2S
retry:
max-attempts: 3
backoff-base-ms: 1000
backoff-max-ms: 30000
zombie-reaper:
enabled: true
scan-interval: PT30S
# NEW: Circuit breaker configuration
circuit-breaker:
enabled: true
failure-threshold: 5
timeout-duration: PT30S
half-open-success-threshold: 2
Frontend Migration
Current Frontend (v1.x)
import { useExecuteWorkflowMutation } from "../../backend/api";
const [executeWorkflow] = useExecuteWorkflowMutation();
const handleExecute = async () => {
const result = await executeWorkflow({ id: workflowId }).unwrap();
console.log("Workflow executed:", result);
};
New Frontend (v2.0)
import {
useExecuteWorkflowMutation,
useGetWorkflowExecutionQuery,
useListRunsQuery,
} from "../../backend/api";
const [executeWorkflow] = useExecuteWorkflowMutation();
const [executionId, setExecutionId] = useState<string | null>(null);
// Poll for execution status
const { data: execution } = useGetWorkflowExecutionQuery(
{ id: executionId },
{ skip: !executionId, pollingInterval: 2000 }
);
// Get runs for execution
const { data: runs } = useListRunsQuery(
{ executionId },
{ skip: !executionId }
);
const handleExecute = async () => {
const result = await executeWorkflow({
id: workflowId,
payload: { customerId: "123" },
}).unwrap();
setExecutionId(result.id);
console.log("Execution started:", result);
};
// Display execution status
return (
<div>
<button onClick={handleExecute}>Execute</button>
{execution && (
<div>
<h3>Execution Status: {execution.state}</h3>
<p>Started: {new Date(execution.startedAt).toLocaleString()}</p>
{runs && <RunTimeline runs={runs} />}
</div>
)}
</div>
);
Monitoring & Observability
Add OpenTelemetry Tracing
File: valkyrai/src/main/java/com/valkyrlabs/workflow/config/ObservabilityConfig.java
@Configuration
public class ObservabilityConfig {
@Bean
public OpenTelemetry openTelemetry() {
return AutoConfiguredOpenTelemetrySdk.initialize()
.getOpenTelemetrySdk();
}
@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("valkyrai-workflow", "2.0");
}
}
application.yml:
management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: ${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}
Testing Migration
Unit Test (v2.0)
@SpringBootTest
class WorkflowExecutionServiceTest {
@Autowired
private WorkflowExecutionService executionService;
@Autowired
private WorkflowExecutionRepository executionRepo;
@Test
void testExecutionTracking() throws Exception {
// Given
Workflow workflow = createTestWorkflow();
Map<String, Object> payload = Map.of("test", "data");
// When
CompletableFuture<WorkflowExecution> future = executionService.executeWorkflow(
workflow,
"MANUAL",
payload
);
WorkflowExecution execution = future.get(10, TimeUnit.SECONDS);
// Then
assertThat(execution.getState()).isEqualTo(WorkflowExecution.StateEnum.SUCCESS);
assertThat(execution.getWorkflowId()).isEqualTo(workflow.getId());
// Verify execution record persisted
WorkflowExecution persisted = executionRepo.findById(execution.getId()).orElseThrow();
assertThat(persisted.getFinishedAt()).isNotNull();
}
}
Rollback Plan
If something goes wrong:
1. Disable New Features
valkyrai:
workflow:
execution:
enabled: false # Disable execution tracking
runner:
zombie-reaper:
enabled: false # Disable zombie reaper
2. Use Legacy API
All existing code using ValkyrWorkflowService.executeWorkflow() continues to work!
3. Rollback Database
-- Drop new tables (keeps existing data intact)
DROP TABLE IF EXISTS circuit_breaker_state CASCADE;
DROP TABLE IF EXISTS dead_letter_queue CASCADE;
DROP TABLE IF EXISTS workflow_execution CASCADE;
-- Rollback Run table changes (optional, non-breaking)
ALTER TABLE run DROP COLUMN IF EXISTS exec_module_id;
ALTER TABLE run DROP COLUMN IF EXISTS leased_by;
-- ... (drop other new columns)
Migration Checklist
- Review implementation plan
- Run database migration
- Regenerate code (
mvn -pl thorapi clean install) - Deploy new services (backward compatible)
- Test with one workflow
- Enable execution tracking for new workflows
- Monitor DLQ for issues
- Gradually migrate existing workflows
- Update frontend to show execution status
- Set up OpenTelemetry monitoring
- Train team on new DLQ tooling
- Document common troubleshooting steps
- Mark old API as deprecated
- Plan final migration of remaining workflows
Common Issues & Solutions
Issue: Duplicate side effects
Symptom: Email sent twice, payment charged twice
Solution: Check idempotency key generation:
// RunService.java
String key = computeIdempotencyKey(inputs, config);
Optional<Run> existing = runRepo.findByIdempotencyKey(key);
if (existing.isPresent() && existing.get().getState() == Run.StateEnum.SUCCESS) {
return existing.get(); // Deduplicate
}
Issue: Zombie runs not reclaimed
Symptom: Runs stuck in LEASED state
Solution: Enable zombie reaper:
valkyrai:
runner:
zombie-reaper:
enabled: true
scan-interval: PT30S
Issue: DLQ entries not appearing
Symptom: Failed runs not in DLQ
Solution: Check retry logic and failure classification:
// RunService.java
if (canRetry && run.getAttempt() < maxAttempts) {
// Requeue for retry
} else {
// Move to DLQ
moveToDLQ(run, "MAX_RETRIES");
}
Support & Questions
- See WORKFLOW_ENGINE_V2_IMPLEMENTATION.md for detailed implementation
- See WORKFLOW_ENGINE_V2_QUICKSTART.md for quick reference
- Check existing
ValkyrWorkflowServicecode for patterns
Migration is non-breaking and can be done incrementally. 🚀