API Integration ExecModules
The ValkyrAI Workflow Engine provides powerful API integration capabilities through specialized ExecModules. These modules enable seamless connectivity with external services using modern protocols and standards with enterprise-grade security and reliability.
Available API Integration Modules
REST API Client (api.rest.generic)
Generic REST API client with OpenAPI specification support for automatic request construction.
Configuration Schema:
{
"operationId": "getPetById",
"params": {
"petId": "12345",
"includeDetails": "true"
},
"body": {
"name": "Fluffy",
"status": "available"
},
"headers": {
"Content-Type": "application/json",
"Accept": "application/json"
},
"mapping_profile": "petstore_v1"
}
OpenAPI Integration:
- Automatically resolves endpoints from
ExecModule.specs[0] - Validates request/response schemas
- Generates proper HTTP methods and paths
- Supports parameter serialization
Integration Account Requirements:
apiKey: API key or Bearer tokenpassword: Additional secret if required
WorkflowState Keys Emitted:
api.rest.status: HTTP response status codeapi.rest.body: Response body (truncated if large)api.rest.headers: Response headers (truncated)
GraphQL Client (api.graphql.generic)
Execute GraphQL queries and mutations with variable support and error handling.
Configuration Schema:
{
"endpoint": "https://api.github.com/graphql",
"query": "query GetUser($login: String!) { user(login: $login) { name email bio } }",
"variables": {
"login": "octocat"
},
"operationName": "GetUser"
}
Features:
- Full GraphQL specification support
- Query validation and optimization
- Automatic error extraction and handling
- Response data normalization
Integration Account Requirements:
apiKey: GraphQL API token or Bearer tokenpassword: (Optional) Additional authentication
WorkflowState Keys Emitted:
api.gql.data: GraphQL response dataapi.gql.errors: GraphQL errors array
gRPC Client (api.grpc.generic)
Generic gRPC client with support for unary and server-streaming calls.
Configuration Schema:
{
"target": "api.service.com:443",
"fullMethodName": "greeter.v1.GreeterService/SayHello",
"message": {
"name": "World",
"language": "en"
},
"metadata": {
"authorization": "Bearer token123",
"x-request-id": "req_12345"
}
}
Features:
- Protocol buffer serialization/deserialization
- Server streaming support with EventLog per message
- Automatic retry with exponential backoff
- Connection pooling and load balancing
Integration Account Requirements:
apiKey: gRPC authentication tokenpassword: (Optional) Additional metadata
WorkflowState Keys Emitted:
api.grpc.messages: JSON array of received messages (size-capped)
WebSocket Client (api.ws.client)
Real-time WebSocket communication with connection lifecycle management.
Configuration Schema:
{
"url": "wss://api.example.com/realtime",
"initial_message": {
"type": "subscribe",
"channels": ["orders", "notifications"]
},
"ping_interval_ms": 30000,
"close_after_ms": 300000
}
Features:
- Automatic connection management
- Ping/pong heartbeat support
- Message frame logging with payload hashing
- Graceful disconnection handling
Integration Account Requirements:
apiKey: WebSocket authentication tokenpassword: (Optional) Additional credentials
WorkflowState Keys Emitted:
api.ws.last_frame_hash: Hash of last received frameapi.ws.frames_count: Total frames received
Webhook Receiver (api.webhook.receiver)
Secure webhook endpoint with signature verification and payload processing.
Configuration Schema:
{
"path": "/_hooks/stripe",
"signing_secret": "whsec_1234567890abcdef",
"replay_target_module": "payment_processor"
}
Security Features:
- HMAC-SHA256 signature verification
- Payload deduplication
- Request rate limiting
- IP whitelist support (configurable)
Integration Account Requirements:
apiKey: Webhook signing secretpassword: (Optional) Additional verification
WorkflowState Keys Emitted:
webhook.last_event_id: Last processed event IDwebhook.last_result: Processing result summary
Advanced Features
OpenAPI Integration
The REST module provides sophisticated OpenAPI integration:
// Automatic endpoint resolution from spec
ExecModule restModule = new ExecModule()
.moduleType("api.rest.generic")
.specs(List.of(openApiSpec)) // Automatically loaded
.moduleData("""
{
"operationId": "createUser",
"body": {"name": "John", "email": "john@example.com"}
}
""");
Server Streaming Support
gRPC and WebSocket modules support real-time data streams:
// Monitor server-streaming responses
workflow.getEventLog()
.filter(event -> event.getEventDetails().contains("Received"))
.subscribe(event -> {
log.info("New message: {}", event.getEventDetails());
});
Response Processing Pipeline
All modules include comprehensive response processing:
- Validation: Schema validation against OpenAPI/GraphQL specs
- Transformation: Data format conversion and normalization
- Truncation: Large response handling with size limits
- Redaction: Automatic PII and sensitive data removal
- Caching: Response caching with configurable TTL
Security Features
Authentication Support
Multiple authentication mechanisms supported across protocols:
REST/GraphQL:
- Bearer tokens
- API keys (header/query parameter)
- Basic authentication
- OAuth 2.0 flows
- Custom header schemes
gRPC:
- TLS client certificates
- JWT tokens in metadata
- Custom authentication headers
WebSocket:
- Token-based authentication
- Connection upgrade headers
- Subprotocol negotiation
Data Protection
- Request/Response Logging: Sensitive data automatically redacted
- Credential Security: Never log API keys or tokens
- TLS/SSL: Enforce encrypted connections
- Rate Limiting: Prevent abuse and respect API quotas
RBAC Controls
All API modules enforce role-based access:
- ADMIN: Full access to all API operations
- USER: Restricted access with approval workflows
- ANONYMOUS: Blocked from sensitive operations
Error Handling & Resilience
HTTP Status Code Handling
REST module provides detailed HTTP error handling:
// Automatic retry for transient errors
if (status >= 500) {
return RetryDecision.RETRY_WITH_BACKOFF;
} else if (status == 429) {
return RetryDecision.RETRY_AFTER_DELAY;
} else if (status >= 400) {
return RetryDecision.FAIL_IMMEDIATELY;
}
Circuit Breaker Pattern
Automatic circuit breaking for failing services:
- Closed: Normal operation, requests flow through
- Open: Service failing, requests rejected immediately
- Half-Open: Testing service recovery
Timeout Management
Configurable timeouts at multiple levels:
- Connection Timeout: Time to establish connection
- Request Timeout: Time to complete request/response
- Stream Timeout: Time for streaming operations
Usage Examples
Multi-Service API Orchestration
// Orchestrate multiple API calls
Workflow apiOrchestration = new Workflow()
.addModule(new ExecModule()
.moduleType("api.rest.generic")
.moduleData("""
{
"operationId": "getUserProfile",
"params": {"userId": "12345"}
}
"""))
.addModule(new ExecModule()
.moduleType("api.graphql.generic")
.moduleData("""
{
"endpoint": "https://api.github.com/graphql",
"query": "query { viewer { login name } }"
}
"""))
.addModule(new ExecModule()
.moduleType("api.grpc.generic")
.moduleData("""
{
"target": "recommendations.api.com:443",
"fullMethodName": "recommendations.v1.Service/GetRecommendations",
"message": {"userId": "12345", "limit": 10}
}
"""));
Real-time Data Processing
// WebSocket + Webhook integration
Workflow realtimeProcessing = new Workflow()
.addModule(webhookReceiver) // Receive external events
.addModule(websocketClient) // Stream to real-time clients
.addModule(restNotification); // Send notifications via REST
API Gateway Pattern
// Use as API gateway with routing
ExecModule apiGateway = new ExecModule()
.moduleType("api.rest.generic")
.moduleData("""
{
"operationId": "routeRequest",
"headers": {"X-Route-Target": "{{workflow.route_target}}"},
"body": "{{workflow.request_body}}"
}
""");
Monitoring & Observability
Request/Response Tracing
All API calls include comprehensive tracing:
// Extract API metrics from WorkflowState
Map<String, Object> apiMetrics = workflow.getWorkflowState()
.stream()
.filter(state -> state.getKey().startsWith("api."))
.collect(Collectors.toMap(
WorkflowState::getKey,
WorkflowState::getValue
));
Performance Monitoring
Track API performance across all protocols:
- Latency: Request/response timing
- Throughput: Requests per second
- Error Rates: 4xx/5xx error percentages
- Availability: Service uptime metrics
EventLog Integration
Rich event logging for debugging and monitoring:
// EventLog entries for each API call
2024-01-15T10:00:00Z [INFO] Starting REST API call: getUserProfile
2024-01-15T10:00:00Z [INFO] Request sent to: https://api.example.com/users/12345
2024-01-15T10:00:01Z [INFO] Response received: 200 OK (1.2s)
2024-01-15T10:00:01Z [INFO] Response size: 2.3KB (truncated)
Best Practices
API Design
- Idempotency: Design APIs to be safe for retries
- Versioning: Use proper API versioning strategies
- Pagination: Handle large datasets appropriately
- Rate Limiting: Respect API rate limits
Error Handling
- Graceful Degradation: Provide fallback responses
- Retry Logic: Implement exponential backoff
- Circuit Breakers: Prevent cascading failures
- Timeout Configuration: Set appropriate timeouts
Security
- Token Rotation: Regularly rotate API credentials
- Least Privilege: Use minimal required permissions
- Request Signing: Verify request authenticity
- Audit Logging: Log all API interactions
Performance
- Connection Pooling: Reuse HTTP connections
- Response Caching: Cache appropriate responses
- Compression: Enable request/response compression
- Load Balancing: Distribute API load appropriately