ValkyrAI Workflow Engine v2.0 — Phase 1 Complete ✅
Date: 2025-10-23 Status: Schema & API Definitions Complete Next Phase: Service Layer Implementation
✅ Completed: Phase 1 - Data Model Extensions
New OpenAPI Schemas Added
Added to valkyrai/src/main/resources/openapi/api.hbs.yaml (lines ~1447-1670):
1. WorkflowExecution
- Tracks individual execution instances separate from workflow definitions
- Full state lifecycle: PENDING → RUNNING → SUCCESS/FAILED/CANCELLED
- Stores trigger type (MANUAL/CRON/EVENT/WEBHOOK/QUEUE) and payload
- Captures metrics snapshot and error details
- Supports nested workflows via
parentExecutionId
2. Run
- Granular tracking of each task attempt within an execution
- Idempotency:
idempotencyKey(SHA-256 of inputs + config) - Lease mechanism:
leaseUntil,leasedBy,runnerIdfor crash recovery - Heartbeat:
heartbeatAtfor zombie detection - Retry logic:
attempt,retryAfter, exponential backoff support - Error classification: TRANSIENT/PERMANENT/TIMEOUT/CIRCUIT_OPEN
- Cost tracking:
costTokensfor LLM/API usage
3. DeadLetterQueue
- Quarantine failed runs for manual intervention
- Stores original inputs/config for replay
- Resolution workflow: PENDING → REQUEUED/FIXED/DISCARDED
- Failure classification: MAX_RETRIES/PERMANENT_ERROR/CIRCUIT_BREAKER/MANUAL
4. CircuitBreakerState
- Per-module/endpoint circuit breaker tracking
- States: CLOSED → OPEN → HALF_OPEN
- Failure/success counters with time windows
- Automatic retry scheduling
New API Endpoints Added
Added to paths: section (lines ~70-250):
WorkflowExecution Control
POST /WorkflowExecution/{id}/cancel— Cancel running executionPOST /WorkflowExecution/{id}/pause— Pause executionPOST /WorkflowExecution/{id}/resume— Resume paused execution
Run Management
POST /Run/{id}/heartbeat— Runner keepalive (prevents zombie reaping)
DLQ Operations
POST /DeadLetterQueue/{id}/requeue— Retry with optional input overridesPOST /DeadLetterQueue/{id}/discard— Mark as permanently discarded
Note: Standard CRUD endpoints (GET/POST/PUT/DELETE for list/create/read/update) will be auto-generated by ThorAPI for all four new models.
🔄 Next Steps: Phase 2 - Code Generation
1. Regenerate Models & Controllers
cd /Users/johnmcmahon/workspace/2025/valkyr/ValkyrAI
# Regenerate ThorAPI (generates Java models + repositories + services)
mvn -pl thorapi clean install
# This will create:
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/WorkflowExecution.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/Run.java (enhanced)
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/DeadLetterQueue.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/CircuitBreakerState.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/api/*Repository.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/api/*Service.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/api/*Controller.java
2. Regenerate TypeScript Clients (Frontend)
# Regenerate web TypeScript API clients
cd web
npm run generate:api # or equivalent ThorAPI command
# This will create RTK Query hooks:
# - useListWorkflowExecutionsQuery()
# - useGetWorkflowExecutionQuery()
# - useCancelWorkflowExecutionMutation()
# - usePauseWorkflowExecutionMutation()
# - useResumeWorkflowExecutionMutation()
# - useListRunsQuery()
# - useUpdateRunHeartbeatMutation()
# - useListDeadLetterQueueQuery()
# - useRequeueDeadLetterEntryMutation()
# - useDiscardDeadLetterEntryMutation()
📋 Phase 3: Service Layer Implementation
3.1 WorkflowExecutionService
File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/WorkflowExecutionService.java
Responsibilities:
- Wrap existing
ValkyrWorkflowService.executeWorkflow()to create execution records - Manage execution lifecycle (PENDING → RUNNING → SUCCESS/FAILED)
- Track execution metrics (duration, costs, step counts)
- Support pause/resume/cancel operations
Key Methods:
@Service
public class WorkflowExecutionService {
WorkflowExecution createExecution(Workflow workflow, String triggerType, String triggerPayload);
CompletableFuture<WorkflowExecution> executeWorkflow(Workflow workflow, String triggerType, Map<String, Object> payload);
void cancelExecution(UUID executionId);
void pauseExecution(UUID executionId);
void resumeExecution(UUID executionId);
}
3.2 RunService
File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/RunService.java
Responsibilities:
- Create runs with idempotency key generation (SHA-256 of inputs + config)
- Lease management for at-least-once delivery
- Heartbeat tracking for zombie detection
- Retry logic with exponential backoff
- DLQ management for permanent failures
Key Methods:
@Service
public class RunService {
Run createRun(UUID executionId, Task task, ExecModule module, Map<String, Object> inputs);
Optional<Run> acquireLease(UUID runId, String runnerId);
void updateHeartbeat(UUID runId);
void markSuccess(UUID runId, Map<String, Object> outputs, long durationMs);
void markFailed(UUID runId, String error, String errorType, boolean canRetry);
String computeIdempotencyKey(Map<String, Object> inputs, String config);
}
3.3 DLQService
File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/DLQService.java
Responsibilities:
- List/filter DLQ entries
- Requeue entries with optional input overrides
- Discard entries permanently
- Track resolution workflow
Key Methods:
@Service
public class DLQService {
List<DeadLetterQueue> listDLQ(DeadLetterQueue.ResolutionEnum resolution);
void requeue(UUID dlqId, Map<String, Object> inputOverrides);
void discard(UUID dlqId, String notes);
}
3.4 RunnerService (Execution Pool)
File: valkyrai/src/main/java/com/valkyrlabs/workflow/runner/RunnerService.java
Responsibilities:
- Poll for pending runs
- Acquire leases
- Execute runs with heartbeat
- Handle failures and retries
- Zombie reaper (reclaim expired leases)
Key Methods:
@Service
public class RunnerService {
@Scheduled(fixedDelay = "PT5S")
void pollForWork();
void executeRun(Run run);
@Scheduled(fixedDelay = "PT30S")
void reapZombieRuns();
}
📊 Database Migrations
SQL Migration Script
File: valkyrai/src/main/resources/db/migration/V2.0__workflow_execution_tracking.sql
-- WorkflowExecution table
CREATE TABLE IF NOT EXISTS workflow_execution (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id),
version INTEGER,
state VARCHAR(20) NOT NULL DEFAULT 'PENDING',
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
parent_execution_id UUID REFERENCES workflow_execution(id),
trigger_type VARCHAR(20),
trigger_payload TEXT,
context_data TEXT,
error_message TEXT,
error_stack_trace TEXT,
metrics_snapshot TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);
-- Indexes for WorkflowExecution
CREATE INDEX idx_workflow_execution_workflow_id ON workflow_execution(workflow_id);
CREATE INDEX idx_workflow_execution_state ON workflow_execution(state);
CREATE INDEX idx_workflow_execution_started_at ON workflow_execution(started_at DESC);
-- Enhance Run table (if exists) or create new
ALTER TABLE run ADD COLUMN IF NOT EXISTS exec_module_id UUID;
ALTER TABLE run ADD COLUMN IF NOT EXISTS leased_by VARCHAR(255);
ALTER TABLE run ADD COLUMN IF NOT EXISTS inputs_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS config_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS retry_after TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS duration_ms INTEGER;
ALTER TABLE run ADD COLUMN IF NOT EXISTS cost_tokens NUMERIC(10,4);
ALTER TABLE run ADD COLUMN IF NOT EXISTS error_type VARCHAR(20);
-- Indexes for Run
CREATE INDEX IF NOT EXISTS idx_run_idempotency ON run(idempotency_key);
CREATE INDEX IF NOT EXISTS idx_run_execution ON run(execution_id);
CREATE INDEX IF NOT EXISTS idx_run_state ON run(state);
CREATE INDEX IF NOT EXISTS idx_run_lease_until ON run(lease_until) WHERE state = 'LEASED';
-- DeadLetterQueue table
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID REFERENCES run(id),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
failure_reason TEXT,
failure_type VARCHAR(50),
original_inputs TEXT,
original_config TEXT,
quarantined_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
reviewed_by UUID REFERENCES principal(id),
resolution VARCHAR(20) DEFAULT 'PENDING',
resolved_at TIMESTAMP WITH TIME ZONE,
notes TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);
-- Indexes for DLQ
CREATE INDEX idx_dlq_resolution ON dead_letter_queue(resolution);
CREATE INDEX idx_dlq_quarantined_at ON dead_letter_queue(quarantined_at DESC);
CREATE INDEX idx_dlq_execution_id ON dead_letter_queue(execution_id);
-- CircuitBreakerState table
CREATE TABLE IF NOT EXISTS circuit_breaker_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
target VARCHAR(500) NOT NULL UNIQUE,
state VARCHAR(20) NOT NULL DEFAULT 'CLOSED',
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
last_failure_at TIMESTAMP WITH TIME ZONE,
opened_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE,
window_start_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);
-- Index for circuit breaker lookups
CREATE INDEX idx_circuit_breaker_target ON circuit_breaker_state(target);
🧪 Testing Strategy
Unit Tests
WorkflowExecutionServiceTest— Execution lifecycle, state transitionsRunServiceTest— Idempotency, lease logic, retry backoffDLQServiceTest— Requeue/discard workflows
Integration Tests
WorkflowExecutionIntegrationTest— End-to-end execution with runsLeaseRecoveryTest— Simulate runner crashes, verify lease reclaimIdempotencyTest— Duplicate requests with same idempotency keyDLQReplayTest— Quarantine → requeue → success flow
Chaos Tests
- Kill runner pod mid-execution → verify no duplicate side effects
- Database connection failure → verify graceful degradation
- Zombie reaper test → expire leases, verify requeue
📈 Success Metrics
After Phase 3 completion, validate:
✅ Deterministic Replay: Same inputs → same outputs (idempotency key match)
✅ Crash Recovery: Kill runner mid-step → run resumes on another node
✅ Idempotency: Duplicate run requests → deduplicated via idempotency key
✅ DLQ Tooling: Failed runs quarantined + requeue works
✅ Observability: Traces link to runs; execution timeline visible
✅ Performance: P95 dispatch ≤ 75ms; lease acquisition ≤ 10ms
📝 Configuration Updates
application.yml
valkyrai:
workflow:
execution:
enabled: true
track-metrics: true
runner:
id: ${RUNNER_ID:#{T(java.util.UUID).randomUUID().toString()}}
concurrency: ${RUNNER_CONCURRENCY:10}
poll-interval: PT5S
lease:
duration: PT2M
heartbeat-interval: PT2S
retry:
max-attempts: 3
backoff-base-ms: 1000
backoff-max-ms: 30000
zombie-reaper:
enabled: true
scan-interval: PT30S
circuit-breaker:
enabled: true
failure-threshold: 5
timeout-duration: PT30S
half-open-success-threshold: 2
🎯 Immediate Action Items
- ✅ Review this Phase 1 completion with the team
- 🔄 Run ThorAPI regeneration:
mvn -pl thorapi clean install
mvn -pl valkyrai clean package - 📝 Verify generated models in
valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/ - 🛠️ Implement Phase 3 services (WorkflowExecutionService, RunService, DLQService)
- 🗄️ Create and run database migration (V2.0__workflow_execution_tracking.sql)
- 🧪 Write unit tests for core idempotency and lease logic
- 🎨 Frontend: Regenerate TypeScript clients and create WorkflowExecutionMonitor component
🚀 Timeline Estimate
- Phase 2 (Regeneration): 1 day
- Phase 3 (Service Layer): 3-5 days
- Phase 4 (Runner Pool): 2-3 days
- Phase 5 (Controllers): 1 day
- Phase 6 (Observability): 2 days
- Phase 7 (Frontend): 3-4 days
- Phase 8 (Integration + Testing): 3-5 days
Total: ~2-3 weeks to full v2.0 implementation
🎓 Key Architectural Decisions
Why Separate WorkflowExecution from Workflow?
- Definition vs. Instance: Workflows are templates; Executions are runtime instances
- History: Track all historical runs without polluting workflow definition
- Versioning: Execution captures workflow version at runtime for reproducibility
Why Idempotency Keys?
- Exact-Once Semantics: Prevent duplicate side effects (email sends, payments)
- Content-Based: SHA-256 of inputs + config = deterministic replay
- Retry Safety: Same request → same key → deduplicated
Why Lease Mechanism?
- At-Least-Once Delivery: Ensures no lost tasks
- Crash Recovery: Expired leases reclaimed by zombie reaper
- Distributed Safety: No double execution via lease ownership
Why DLQ?
- Human-in-the-Loop: Permanent failures need manual intervention
- Replay Capability: Store original inputs for debugging + retry
- Operational Clarity: Separate transient (auto-retry) from permanent (DLQ) failures
End of Phase 1 Summary
Next: WORKFLOW_ENGINE_V2_IMPLEMENTATION.md for full implementation guide.