Skip to main content

ValkyrAI Workflow Engine v2.0 — Migration Guide

Migrating from v1.x (current) to v2.0 (kill-n8n edition)


Overview

This guide helps you migrate existing workflows and code from the current v1.x execution model to the new v2.0 system with:

  • Separate execution tracking (WorkflowExecution)
  • Idempotent runs with crash recovery (Run)
  • Dead letter queue (DLQ)
  • Circuit breakers

Backward Compatibility: The old ValkyrWorkflowService.executeWorkflow() API still works! We're wrapping it, not replacing it.


Migration Strategy

Phase 1: Add New Infrastructure (Non-Breaking)

  1. Add new models via OpenAPI schema ✅ DONE
  2. Regenerate code ✅ DONE
  3. Run database migrations
  4. Deploy new services alongside existing ones
  5. No existing workflows break

Phase 2: Gradual Migration

  1. Wrap existing execution calls with execution tracking
  2. Enable for new workflows only first
  3. Gradually migrate existing workflows
  4. Monitor metrics and DLQ

Phase 3: Deprecate Old API

  1. Mark old direct execution methods as @Deprecated
  2. Migrate all workflows to new API
  3. Remove deprecated methods in v3.0

Database Migration

Step 1: Create Migration File

File: valkyrai/src/main/resources/db/migration/V2.0__workflow_execution_tracking.sql

-- WorkflowExecution table
CREATE TABLE IF NOT EXISTS workflow_execution (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id),
version INTEGER,
state VARCHAR(20) NOT NULL DEFAULT 'PENDING',
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
parent_execution_id UUID REFERENCES workflow_execution(id),
trigger_type VARCHAR(20),
trigger_payload TEXT,
context_data TEXT,
error_message TEXT,
error_stack_trace TEXT,
metrics_snapshot TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);

CREATE INDEX idx_workflow_execution_workflow_id ON workflow_execution(workflow_id);
CREATE INDEX idx_workflow_execution_state ON workflow_execution(state);
CREATE INDEX idx_workflow_execution_started_at ON workflow_execution(started_at DESC);

-- Enhance existing Run table (or create if doesn't exist)
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'run') THEN
CREATE TABLE run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
attempt INTEGER DEFAULT 1,
state VARCHAR(20) DEFAULT 'PENDING',
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
END IF;
END $$;

-- Add new columns to Run table
ALTER TABLE run ADD COLUMN IF NOT EXISTS exec_module_id UUID;
ALTER TABLE run ADD COLUMN IF NOT EXISTS leased_by VARCHAR(255);
ALTER TABLE run ADD COLUMN IF NOT EXISTS lease_until TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS idempotency_key VARCHAR(128);
ALTER TABLE run ADD COLUMN IF NOT EXISTS inputs_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS config_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS retry_after TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS started_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS finished_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS outputs TEXT;
ALTER TABLE run ADD COLUMN IF NOT EXISTS error TEXT;
ALTER TABLE run ADD COLUMN IF NOT EXISTS error_type VARCHAR(20);
ALTER TABLE run ADD COLUMN IF NOT EXISTS duration_ms INTEGER;
ALTER TABLE run ADD COLUMN IF NOT EXISTS cost_tokens NUMERIC(10,4);
ALTER TABLE run ADD COLUMN IF NOT EXISTS runner_id VARCHAR(255);

CREATE INDEX IF NOT EXISTS idx_run_idempotency ON run(idempotency_key);
CREATE INDEX IF NOT EXISTS idx_run_execution ON run(execution_id);
CREATE INDEX IF NOT EXISTS idx_run_state ON run(state);
CREATE INDEX IF NOT EXISTS idx_run_lease_until ON run(lease_until) WHERE state = 'LEASED';

-- DeadLetterQueue table
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID REFERENCES run(id),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
failure_reason TEXT,
failure_type VARCHAR(50),
original_inputs TEXT,
original_config TEXT,
quarantined_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
reviewed_by UUID REFERENCES principal(id),
resolution VARCHAR(20) DEFAULT 'PENDING',
resolved_at TIMESTAMP WITH TIME ZONE,
notes TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);

CREATE INDEX idx_dlq_resolution ON dead_letter_queue(resolution);
CREATE INDEX idx_dlq_quarantined_at ON dead_letter_queue(quarantined_at DESC);
CREATE INDEX idx_dlq_execution_id ON dead_letter_queue(execution_id);

-- CircuitBreakerState table
CREATE TABLE IF NOT EXISTS circuit_breaker_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
target VARCHAR(500) NOT NULL UNIQUE,
state VARCHAR(20) NOT NULL DEFAULT 'CLOSED',
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
last_failure_at TIMESTAMP WITH TIME ZONE,
opened_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE,
window_start_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);

CREATE INDEX idx_circuit_breaker_target ON circuit_breaker_state(target);

Step 2: Run Migration

# If using Flyway
mvn flyway:migrate

# OR if using Liquibase
mvn liquibase:update

# OR restart application with auto-migration enabled
# (Spring Boot will auto-run Flyway/Liquibase on startup)

Code Migration

Current Code (v1.x)

@Autowired
private ValkyrWorkflowService workflowService;

// Direct execution (no tracking)
CompletableFuture<Workflow> future = workflowService.executeWorkflow(workflow);

New Code (v2.0)

@Autowired
private WorkflowExecutionService executionService;

// Tracked execution with metrics
CompletableFuture<WorkflowExecution> future = executionService.executeWorkflow(
workflow,
"MANUAL", // trigger type
Map.of("customerId", "123", "amount", 49.0) // payload
);

// Get execution ID for monitoring
future.thenAccept(execution -> {
log.info("Execution started: {}", execution.getId());
});

Backward-Compatible Wrapper

File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/WorkflowExecutionService.java

@Service
public class WorkflowExecutionService {

@Autowired
private ValkyrWorkflowService legacyService;

@Autowired
private WorkflowExecutionRepository executionRepo;

/**
* v2.0 API: Execute with full tracking
*/
public CompletableFuture<WorkflowExecution> executeWorkflow(
Workflow workflow,
String triggerType,
Map<String, Object> payload) {

// Create execution record
WorkflowExecution execution = new WorkflowExecution();
execution.setWorkflowId(workflow.getId());
execution.setState(WorkflowExecution.StateEnum.PENDING);
execution.setTriggerType(triggerType);
execution.setTriggerPayload(toJson(payload));
execution = executionRepo.save(execution);

final UUID executionId = execution.getId();

// Delegate to legacy service (backward compatible)
return legacyService.executeWorkflow(workflow)
.thenApply(completedWorkflow -> {
// Update execution record
WorkflowExecution updated = executionRepo.findById(executionId).orElseThrow();
updated.setState(WorkflowExecution.StateEnum.SUCCESS);
updated.setFinishedAt(OffsetDateTime.now());
return executionRepo.save(updated);
})
.exceptionally(ex -> {
// Handle failure
WorkflowExecution failed = executionRepo.findById(executionId).orElseThrow();
failed.setState(WorkflowExecution.StateEnum.FAILED);
failed.setErrorMessage(ex.getMessage());
failed.setFinishedAt(OffsetDateTime.now());
return executionRepo.save(failed);
});
}

/**
* v1.x compatibility: Execute without tracking (deprecated)
* @deprecated Use executeWorkflow(Workflow, String, Map) instead
*/
@Deprecated
public CompletableFuture<Workflow> executeWorkflowLegacy(Workflow workflow) {
return legacyService.executeWorkflow(workflow);
}
}

Controller Migration

Current Controller (v1.x)

@RestController
@RequestMapping("/v1/workflow")
public class WorkflowController {

@PostMapping("/{id}/execute")
public ResponseEntity<Workflow> execute(@PathVariable UUID id) {
Workflow workflow = workflowService.findById(id).orElseThrow();
CompletableFuture<Workflow> future = valkyrWorkflowService.executeWorkflow(workflow);
// ... return workflow
}
}

New Controller (v2.0)

@RestController
@RequestMapping("/v1/workflow")
public class WorkflowController {

@Autowired
private WorkflowExecutionService executionService;

@PostMapping("/{id}/execute")
public ResponseEntity<WorkflowExecution> execute(
@PathVariable UUID id,
@RequestBody Map<String, Object> payload) {

Workflow workflow = workflowService.findById(id).orElseThrow();

CompletableFuture<WorkflowExecution> future = executionService.executeWorkflow(
workflow,
"MANUAL",
payload
);

// Return immediately (async execution)
WorkflowExecution execution = future.getNow(null);
return ResponseEntity.accepted().body(execution);
}

@GetMapping("/execution/{executionId}")
public ResponseEntity<WorkflowExecution> getExecution(@PathVariable UUID executionId) {
return executionService.findById(executionId)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
}

Configuration Migration

Current Config (v1.x)

valkyrai:
workflow:
async: true
pool-size: 10

New Config (v2.0)

valkyrai:
workflow:
async: true
pool-size: 10

# NEW: Execution tracking
execution:
enabled: true
track-metrics: true

# NEW: Runner pool configuration
runner:
id: ${RUNNER_ID:#{T(java.util.UUID).randomUUID().toString()}}
concurrency: 10
poll-interval: PT5S

lease:
duration: PT2M
heartbeat-interval: PT2S

retry:
max-attempts: 3
backoff-base-ms: 1000
backoff-max-ms: 30000

zombie-reaper:
enabled: true
scan-interval: PT30S

# NEW: Circuit breaker configuration
circuit-breaker:
enabled: true
failure-threshold: 5
timeout-duration: PT30S
half-open-success-threshold: 2

Frontend Migration

Current Frontend (v1.x)

import { useExecuteWorkflowMutation } from "../../backend/api";

const [executeWorkflow] = useExecuteWorkflowMutation();

const handleExecute = async () => {
const result = await executeWorkflow({ id: workflowId }).unwrap();
console.log("Workflow executed:", result);
};

New Frontend (v2.0)

import {
useExecuteWorkflowMutation,
useGetWorkflowExecutionQuery,
useListRunsQuery,
} from "../../backend/api";

const [executeWorkflow] = useExecuteWorkflowMutation();
const [executionId, setExecutionId] = useState<string | null>(null);

// Poll for execution status
const { data: execution } = useGetWorkflowExecutionQuery(
{ id: executionId },
{ skip: !executionId, pollingInterval: 2000 }
);

// Get runs for execution
const { data: runs } = useListRunsQuery(
{ executionId },
{ skip: !executionId }
);

const handleExecute = async () => {
const result = await executeWorkflow({
id: workflowId,
payload: { customerId: "123" },
}).unwrap();

setExecutionId(result.id);
console.log("Execution started:", result);
};

// Display execution status
return (
<div>
<button onClick={handleExecute}>Execute</button>
{execution && (
<div>
<h3>Execution Status: {execution.state}</h3>
<p>Started: {new Date(execution.startedAt).toLocaleString()}</p>
{runs && <RunTimeline runs={runs} />}
</div>
)}
</div>
);

Monitoring & Observability

Add OpenTelemetry Tracing

File: valkyrai/src/main/java/com/valkyrlabs/workflow/config/ObservabilityConfig.java

@Configuration
public class ObservabilityConfig {

@Bean
public OpenTelemetry openTelemetry() {
return AutoConfiguredOpenTelemetrySdk.initialize()
.getOpenTelemetrySdk();
}

@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("valkyrai-workflow", "2.0");
}
}

application.yml:

management:
tracing:
enabled: true
sampling:
probability: 1.0

otlp:
tracing:
endpoint: ${OTEL_EXPORTER_OTLP_ENDPOINT:http://localhost:4317}

Testing Migration

Unit Test (v2.0)

@SpringBootTest
class WorkflowExecutionServiceTest {

@Autowired
private WorkflowExecutionService executionService;

@Autowired
private WorkflowExecutionRepository executionRepo;

@Test
void testExecutionTracking() throws Exception {
// Given
Workflow workflow = createTestWorkflow();
Map<String, Object> payload = Map.of("test", "data");

// When
CompletableFuture<WorkflowExecution> future = executionService.executeWorkflow(
workflow,
"MANUAL",
payload
);

WorkflowExecution execution = future.get(10, TimeUnit.SECONDS);

// Then
assertThat(execution.getState()).isEqualTo(WorkflowExecution.StateEnum.SUCCESS);
assertThat(execution.getWorkflowId()).isEqualTo(workflow.getId());

// Verify execution record persisted
WorkflowExecution persisted = executionRepo.findById(execution.getId()).orElseThrow();
assertThat(persisted.getFinishedAt()).isNotNull();
}
}

Rollback Plan

If something goes wrong:

1. Disable New Features

valkyrai:
workflow:
execution:
enabled: false # Disable execution tracking

runner:
zombie-reaper:
enabled: false # Disable zombie reaper

2. Use Legacy API

All existing code using ValkyrWorkflowService.executeWorkflow() continues to work!

3. Rollback Database

-- Drop new tables (keeps existing data intact)
DROP TABLE IF EXISTS circuit_breaker_state CASCADE;
DROP TABLE IF EXISTS dead_letter_queue CASCADE;
DROP TABLE IF EXISTS workflow_execution CASCADE;

-- Rollback Run table changes (optional, non-breaking)
ALTER TABLE run DROP COLUMN IF EXISTS exec_module_id;
ALTER TABLE run DROP COLUMN IF EXISTS leased_by;
-- ... (drop other new columns)

Migration Checklist

  • Review implementation plan
  • Run database migration
  • Regenerate code (mvn -pl thorapi clean install)
  • Deploy new services (backward compatible)
  • Test with one workflow
  • Enable execution tracking for new workflows
  • Monitor DLQ for issues
  • Gradually migrate existing workflows
  • Update frontend to show execution status
  • Set up OpenTelemetry monitoring
  • Train team on new DLQ tooling
  • Document common troubleshooting steps
  • Mark old API as deprecated
  • Plan final migration of remaining workflows

Common Issues & Solutions

Issue: Duplicate side effects

Symptom: Email sent twice, payment charged twice

Solution: Check idempotency key generation:

// RunService.java
String key = computeIdempotencyKey(inputs, config);
Optional<Run> existing = runRepo.findByIdempotencyKey(key);
if (existing.isPresent() && existing.get().getState() == Run.StateEnum.SUCCESS) {
return existing.get(); // Deduplicate
}

Issue: Zombie runs not reclaimed

Symptom: Runs stuck in LEASED state

Solution: Enable zombie reaper:

valkyrai:
runner:
zombie-reaper:
enabled: true
scan-interval: PT30S

Issue: DLQ entries not appearing

Symptom: Failed runs not in DLQ

Solution: Check retry logic and failure classification:

// RunService.java
if (canRetry && run.getAttempt() < maxAttempts) {
// Requeue for retry
} else {
// Move to DLQ
moveToDLQ(run, "MAX_RETRIES");
}

Support & Questions


Migration is non-breaking and can be done incrementally. 🚀