Skip to main content

ValkyrAI Workflow Engine v2.0 — Phase 1 Complete ✅

Date: 2025-10-23 Status: Schema & API Definitions Complete Next Phase: Service Layer Implementation


✅ Completed: Phase 1 - Data Model Extensions

New OpenAPI Schemas Added

Added to valkyrai/src/main/resources/openapi/api.hbs.yaml (lines ~1447-1670):

1. WorkflowExecution

  • Tracks individual execution instances separate from workflow definitions
  • Full state lifecycle: PENDING → RUNNING → SUCCESS/FAILED/CANCELLED
  • Stores trigger type (MANUAL/CRON/EVENT/WEBHOOK/QUEUE) and payload
  • Captures metrics snapshot and error details
  • Supports nested workflows via parentExecutionId

2. Run

  • Granular tracking of each task attempt within an execution
  • Idempotency: idempotencyKey (SHA-256 of inputs + config)
  • Lease mechanism: leaseUntil, leasedBy, runnerId for crash recovery
  • Heartbeat: heartbeatAt for zombie detection
  • Retry logic: attempt, retryAfter, exponential backoff support
  • Error classification: TRANSIENT/PERMANENT/TIMEOUT/CIRCUIT_OPEN
  • Cost tracking: costTokens for LLM/API usage

3. DeadLetterQueue

  • Quarantine failed runs for manual intervention
  • Stores original inputs/config for replay
  • Resolution workflow: PENDING → REQUEUED/FIXED/DISCARDED
  • Failure classification: MAX_RETRIES/PERMANENT_ERROR/CIRCUIT_BREAKER/MANUAL

4. CircuitBreakerState

  • Per-module/endpoint circuit breaker tracking
  • States: CLOSED → OPEN → HALF_OPEN
  • Failure/success counters with time windows
  • Automatic retry scheduling

New API Endpoints Added

Added to paths: section (lines ~70-250):

WorkflowExecution Control

  • POST /WorkflowExecution/{id}/cancel — Cancel running execution
  • POST /WorkflowExecution/{id}/pause — Pause execution
  • POST /WorkflowExecution/{id}/resume — Resume paused execution

Run Management

  • POST /Run/{id}/heartbeat — Runner keepalive (prevents zombie reaping)

DLQ Operations

  • POST /DeadLetterQueue/{id}/requeue — Retry with optional input overrides
  • POST /DeadLetterQueue/{id}/discard — Mark as permanently discarded

Note: Standard CRUD endpoints (GET/POST/PUT/DELETE for list/create/read/update) will be auto-generated by ThorAPI for all four new models.


🔄 Next Steps: Phase 2 - Code Generation

1. Regenerate Models & Controllers

cd /Users/johnmcmahon/workspace/2025/valkyr/ValkyrAI

# Regenerate ThorAPI (generates Java models + repositories + services)
mvn -pl thorapi clean install

# This will create:
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/WorkflowExecution.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/Run.java (enhanced)
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/DeadLetterQueue.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/CircuitBreakerState.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/api/*Repository.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/api/*Service.java
# - valkyrai/generated/spring/src/main/java/com/valkyrlabs/api/*Controller.java

2. Regenerate TypeScript Clients (Frontend)

# Regenerate web TypeScript API clients
cd web
npm run generate:api # or equivalent ThorAPI command

# This will create RTK Query hooks:
# - useListWorkflowExecutionsQuery()
# - useGetWorkflowExecutionQuery()
# - useCancelWorkflowExecutionMutation()
# - usePauseWorkflowExecutionMutation()
# - useResumeWorkflowExecutionMutation()
# - useListRunsQuery()
# - useUpdateRunHeartbeatMutation()
# - useListDeadLetterQueueQuery()
# - useRequeueDeadLetterEntryMutation()
# - useDiscardDeadLetterEntryMutation()

📋 Phase 3: Service Layer Implementation

3.1 WorkflowExecutionService

File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/WorkflowExecutionService.java

Responsibilities:

  • Wrap existing ValkyrWorkflowService.executeWorkflow() to create execution records
  • Manage execution lifecycle (PENDING → RUNNING → SUCCESS/FAILED)
  • Track execution metrics (duration, costs, step counts)
  • Support pause/resume/cancel operations

Key Methods:

@Service
public class WorkflowExecutionService {
WorkflowExecution createExecution(Workflow workflow, String triggerType, String triggerPayload);
CompletableFuture<WorkflowExecution> executeWorkflow(Workflow workflow, String triggerType, Map<String, Object> payload);
void cancelExecution(UUID executionId);
void pauseExecution(UUID executionId);
void resumeExecution(UUID executionId);
}

3.2 RunService

File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/RunService.java

Responsibilities:

  • Create runs with idempotency key generation (SHA-256 of inputs + config)
  • Lease management for at-least-once delivery
  • Heartbeat tracking for zombie detection
  • Retry logic with exponential backoff
  • DLQ management for permanent failures

Key Methods:

@Service
public class RunService {
Run createRun(UUID executionId, Task task, ExecModule module, Map<String, Object> inputs);
Optional<Run> acquireLease(UUID runId, String runnerId);
void updateHeartbeat(UUID runId);
void markSuccess(UUID runId, Map<String, Object> outputs, long durationMs);
void markFailed(UUID runId, String error, String errorType, boolean canRetry);
String computeIdempotencyKey(Map<String, Object> inputs, String config);
}

3.3 DLQService

File: valkyrai/src/main/java/com/valkyrlabs/workflow/service/DLQService.java

Responsibilities:

  • List/filter DLQ entries
  • Requeue entries with optional input overrides
  • Discard entries permanently
  • Track resolution workflow

Key Methods:

@Service
public class DLQService {
List<DeadLetterQueue> listDLQ(DeadLetterQueue.ResolutionEnum resolution);
void requeue(UUID dlqId, Map<String, Object> inputOverrides);
void discard(UUID dlqId, String notes);
}

3.4 RunnerService (Execution Pool)

File: valkyrai/src/main/java/com/valkyrlabs/workflow/runner/RunnerService.java

Responsibilities:

  • Poll for pending runs
  • Acquire leases
  • Execute runs with heartbeat
  • Handle failures and retries
  • Zombie reaper (reclaim expired leases)

Key Methods:

@Service
public class RunnerService {
@Scheduled(fixedDelay = "PT5S")
void pollForWork();

void executeRun(Run run);

@Scheduled(fixedDelay = "PT30S")
void reapZombieRuns();
}

📊 Database Migrations

SQL Migration Script

File: valkyrai/src/main/resources/db/migration/V2.0__workflow_execution_tracking.sql

-- WorkflowExecution table
CREATE TABLE IF NOT EXISTS workflow_execution (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id),
version INTEGER,
state VARCHAR(20) NOT NULL DEFAULT 'PENDING',
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
parent_execution_id UUID REFERENCES workflow_execution(id),
trigger_type VARCHAR(20),
trigger_payload TEXT,
context_data TEXT,
error_message TEXT,
error_stack_trace TEXT,
metrics_snapshot TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);

-- Indexes for WorkflowExecution
CREATE INDEX idx_workflow_execution_workflow_id ON workflow_execution(workflow_id);
CREATE INDEX idx_workflow_execution_state ON workflow_execution(state);
CREATE INDEX idx_workflow_execution_started_at ON workflow_execution(started_at DESC);

-- Enhance Run table (if exists) or create new
ALTER TABLE run ADD COLUMN IF NOT EXISTS exec_module_id UUID;
ALTER TABLE run ADD COLUMN IF NOT EXISTS leased_by VARCHAR(255);
ALTER TABLE run ADD COLUMN IF NOT EXISTS inputs_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS config_hash VARCHAR(64);
ALTER TABLE run ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS retry_after TIMESTAMP WITH TIME ZONE;
ALTER TABLE run ADD COLUMN IF NOT EXISTS duration_ms INTEGER;
ALTER TABLE run ADD COLUMN IF NOT EXISTS cost_tokens NUMERIC(10,4);
ALTER TABLE run ADD COLUMN IF NOT EXISTS error_type VARCHAR(20);

-- Indexes for Run
CREATE INDEX IF NOT EXISTS idx_run_idempotency ON run(idempotency_key);
CREATE INDEX IF NOT EXISTS idx_run_execution ON run(execution_id);
CREATE INDEX IF NOT EXISTS idx_run_state ON run(state);
CREATE INDEX IF NOT EXISTS idx_run_lease_until ON run(lease_until) WHERE state = 'LEASED';

-- DeadLetterQueue table
CREATE TABLE IF NOT EXISTS dead_letter_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID REFERENCES run(id),
execution_id UUID REFERENCES workflow_execution(id),
task_id UUID REFERENCES task(id),
failure_reason TEXT,
failure_type VARCHAR(50),
original_inputs TEXT,
original_config TEXT,
quarantined_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
reviewed_by UUID REFERENCES principal(id),
resolution VARCHAR(20) DEFAULT 'PENDING',
resolved_at TIMESTAMP WITH TIME ZONE,
notes TEXT,
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);

-- Indexes for DLQ
CREATE INDEX idx_dlq_resolution ON dead_letter_queue(resolution);
CREATE INDEX idx_dlq_quarantined_at ON dead_letter_queue(quarantined_at DESC);
CREATE INDEX idx_dlq_execution_id ON dead_letter_queue(execution_id);

-- CircuitBreakerState table
CREATE TABLE IF NOT EXISTS circuit_breaker_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
target VARCHAR(500) NOT NULL UNIQUE,
state VARCHAR(20) NOT NULL DEFAULT 'CLOSED',
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
last_failure_at TIMESTAMP WITH TIME ZONE,
opened_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE,
window_start_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID,
last_modified_date TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_modified_by UUID
);

-- Index for circuit breaker lookups
CREATE INDEX idx_circuit_breaker_target ON circuit_breaker_state(target);

🧪 Testing Strategy

Unit Tests

  • WorkflowExecutionServiceTest — Execution lifecycle, state transitions
  • RunServiceTest — Idempotency, lease logic, retry backoff
  • DLQServiceTest — Requeue/discard workflows

Integration Tests

  • WorkflowExecutionIntegrationTest — End-to-end execution with runs
  • LeaseRecoveryTest — Simulate runner crashes, verify lease reclaim
  • IdempotencyTest — Duplicate requests with same idempotency key
  • DLQReplayTest — Quarantine → requeue → success flow

Chaos Tests

  • Kill runner pod mid-execution → verify no duplicate side effects
  • Database connection failure → verify graceful degradation
  • Zombie reaper test → expire leases, verify requeue

📈 Success Metrics

After Phase 3 completion, validate:

Deterministic Replay: Same inputs → same outputs (idempotency key match)
Crash Recovery: Kill runner mid-step → run resumes on another node
Idempotency: Duplicate run requests → deduplicated via idempotency key
DLQ Tooling: Failed runs quarantined + requeue works
Observability: Traces link to runs; execution timeline visible
Performance: P95 dispatch ≤ 75ms; lease acquisition ≤ 10ms


📝 Configuration Updates

application.yml

valkyrai:
workflow:
execution:
enabled: true
track-metrics: true

runner:
id: ${RUNNER_ID:#{T(java.util.UUID).randomUUID().toString()}}
concurrency: ${RUNNER_CONCURRENCY:10}
poll-interval: PT5S

lease:
duration: PT2M
heartbeat-interval: PT2S

retry:
max-attempts: 3
backoff-base-ms: 1000
backoff-max-ms: 30000

zombie-reaper:
enabled: true
scan-interval: PT30S

circuit-breaker:
enabled: true
failure-threshold: 5
timeout-duration: PT30S
half-open-success-threshold: 2

🎯 Immediate Action Items

  1. Review this Phase 1 completion with the team
  2. 🔄 Run ThorAPI regeneration:
    mvn -pl thorapi clean install
    mvn -pl valkyrai clean package
  3. 📝 Verify generated models in valkyrai/generated/spring/src/main/java/com/valkyrlabs/model/
  4. 🛠️ Implement Phase 3 services (WorkflowExecutionService, RunService, DLQService)
  5. 🗄️ Create and run database migration (V2.0__workflow_execution_tracking.sql)
  6. 🧪 Write unit tests for core idempotency and lease logic
  7. 🎨 Frontend: Regenerate TypeScript clients and create WorkflowExecutionMonitor component

🚀 Timeline Estimate

  • Phase 2 (Regeneration): 1 day
  • Phase 3 (Service Layer): 3-5 days
  • Phase 4 (Runner Pool): 2-3 days
  • Phase 5 (Controllers): 1 day
  • Phase 6 (Observability): 2 days
  • Phase 7 (Frontend): 3-4 days
  • Phase 8 (Integration + Testing): 3-5 days

Total: ~2-3 weeks to full v2.0 implementation


🎓 Key Architectural Decisions

Why Separate WorkflowExecution from Workflow?

  • Definition vs. Instance: Workflows are templates; Executions are runtime instances
  • History: Track all historical runs without polluting workflow definition
  • Versioning: Execution captures workflow version at runtime for reproducibility

Why Idempotency Keys?

  • Exact-Once Semantics: Prevent duplicate side effects (email sends, payments)
  • Content-Based: SHA-256 of inputs + config = deterministic replay
  • Retry Safety: Same request → same key → deduplicated

Why Lease Mechanism?

  • At-Least-Once Delivery: Ensures no lost tasks
  • Crash Recovery: Expired leases reclaimed by zombie reaper
  • Distributed Safety: No double execution via lease ownership

Why DLQ?

  • Human-in-the-Loop: Permanent failures need manual intervention
  • Replay Capability: Store original inputs for debugging + retry
  • Operational Clarity: Separate transient (auto-retry) from permanent (DLQ) failures

End of Phase 1 Summary

Next: WORKFLOW_ENGINE_V2_IMPLEMENTATION.md for full implementation guide.