Workflow Monitor WebSocket Protocol
Real-time communication protocol for workflow execution monitoring using STOMP over WebSocket.
Overview
The Workflow Monitor uses STOMP (Simple Text Oriented Messaging Protocol) over WebSocket for bidirectional, real-time communication between the frontend and backend.
Connection Details
- Protocol: STOMP 1.2
- Transport: WebSocket with SockJS fallback
- Endpoint:
/ws - Message Format: JSON
- Heartbeat: 4000ms (both incoming/outgoing)
Connection Lifecycle
1. Client Connection
import { Client } from '@stomp/stompjs';
const client = new Client({
brokerURL: 'ws://localhost:8080/ws',
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
client.onConnect = (frame) => {
console.log('Connected:', frame);
};
client.activate();
2. Subscription
client.subscribe(`/topic/workflows/${workflowId}/events`, (message) => {
const data = JSON.parse(message.body);
handleMessage(data);
});
3. Disconnection
client.deactivate();
Message Types
Event Log Message
Sent when a module logs an event during execution.
Topic: /topic/workflows/{workflowId}/events
Payload:
{
"type": "event_log",
"workflowId": "550e8400-e29b-41d4-a716-446655440000",
"execModuleId": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"eventLog": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "ok",
"eventDetails": "Successfully processed 100 records",
"createdDate": "2025-10-06T18:15:30.123Z",
"ownerId": "user-123"
}
}
Fields:
type(string): Always "event_log"workflowId(UUID): Workflow identifierexecModuleId(UUID): Module identifiereventLog(object): ThorAPI EventLog modelid(UUID): Event log IDstatus("ok" | "error" | "disabled"): Event statuseventDetails(string): Event messagecreatedDate(ISO8601): TimestampownerId(string, optional): Owner/creator ID
Example Handler:
if (data.type === 'event_log') {
setLiveEvents(prev => ({
...prev,
[data.execModuleId]: [
...(prev[data.execModuleId] || []).slice(-4),
data.eventLog
]
}));
}
Status Update Message
Sent when a module's execution status changes.
Topic: /topic/workflows/{workflowId}/events
Payload:
{
"type": "status_update",
"workflowId": "550e8400-e29b-41d4-a716-446655440000",
"execModuleId": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"status": "RUNNING",
"duration": 1500,
"timestamp": "2025-10-06T18:15:30.123Z"
}
Fields:
type(string): Always "status_update"workflowId(UUID): Workflow identifierexecModuleId(UUID): Module identifierstatus(enum): Module execution statusPENDING- Not yet startedRUNNING- Currently executingSUCCESS- Completed successfullyERROR- Failed with errorPAUSED- Execution pausedSKIPPED- Module skipped
duration(number): Execution time in millisecondstimestamp(ISO8601): When status changed
Example Handler:
if (data.type === 'status_update') {
setLiveStatus(prev => ({
...prev,
[data.execModuleId]: data.status
}));
setDurations(prev => ({
...prev,
[data.execModuleId]: data.duration
}));
}
Workflow State Update Message
Sent when workflow state variables change.
Topic: /topic/workflows/{workflowId}/events
Payload:
{
"type": "state_update",
"workflowId": "550e8400-e29b-41d4-a716-446655440000",
"state": {
"build.version": "1.0.5",
"deploy.target": "production",
"ig.publish.status": "completed",
"ig.publish.url": "https://api.example.com/fhir/ImplementationGuide/123"
},
"timestamp": "2025-10-06T18:15:30.123Z"
}
Fields:
type(string): Always "state_update"workflowId(UUID): Workflow identifierstate(object): Key-value pairs of state variables- Keys follow dot notation for namespacing
- Values can be any JSON-serializable type
timestamp(ISO8601): When state changed
Example Handler:
if (data.type === 'state_update') {
setWorkflowState(prev => ({
...prev,
...data.state
}));
}
Error Handling
Connection Errors
client.onStompError = (frame) => {
console.error('STOMP error:', frame.headers['message']);
setError(new Error(frame.headers['message']));
};
WebSocket Errors
client.onWebSocketError = (event) => {
console.error('WebSocket error:', event);
};
Reconnection Strategy
client.reconnectDelay = 5000; // 5 seconds
client.maxReconnectDelay = 30000; // Max 30 seconds
// Exponential backoff is handled automatically by @stomp/stompjs
Security
Authentication
WebSocket connections should be authenticated using the same mechanism as HTTP requests:
const client = new Client({
brokerURL: 'ws://localhost:8080/ws',
connectHeaders: {
Authorization: `Bearer ${accessToken}`
}
});
Authorization
Backend validates subscription requests:
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
UUID workflowId = extractWorkflowId(destination);
// Verify user has access to this workflow
if (!workflowService.hasAccess(getCurrentUser(), workflowId)) {
throw new AccessDeniedException("Unauthorized");
}
}
return message;
}
});
}
Performance Optimization
Message Batching
Backend batches events to reduce message frequency:
public class WorkflowWebSocketHandler {
private final Map<String, List<EventLog>> pendingEvents = new ConcurrentHashMap<>();
@Scheduled(fixedRate = 100) // Send every 100ms
public void flushPendingEvents() {
pendingEvents.forEach((workflowId, events) -> {
if (!events.isEmpty()) {
broadcastEventLogBatch(workflowId, events);
events.clear();
}
});
}
}
Client-side Debouncing
const debouncedUpdate = useMemo(
() => debounce((data) => {
handleStatusUpdate(data);
}, 100),
[]
);
Backpressure Handling
@Override
public void configureBrokerChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.queueCapacity(1000);
}
Testing
Unit Tests
import { Client } from '@stomp/stompjs';
import { MockWebSocket } from 'mock-socket';
describe('useWorkflowWebSocket', () => {
it('connects and receives messages', async () => {
const mockServer = new MockWebSocket('ws://localhost:8080/ws');
mockServer.on('connection', (socket) => {
socket.send(JSON.stringify({
type: 'status_update',
execModuleId: 'test-123',
status: 'RUNNING'
}));
});
const { result } = renderHook(() => useWorkflowWebSocket('workflow-123'));
await waitFor(() => {
expect(result.current.connected).toBe(true);
});
await waitFor(() => {
expect(result.current.liveStatus['test-123']).toBe('RUNNING');
});
});
});
Integration Tests
# Install wscat for manual testing
npm install -g wscat
# Connect to WebSocket
wscat -c ws://localhost:8080/ws
# Send STOMP CONNECT frame
CONNECT
accept-version:1.2
heart-beat:4000,4000
^@
# Subscribe to workflow events
SUBSCRIBE
id:sub-1
destination:/topic/workflows/550e8400-e29b-41d4-a716-446655440000/events
^@
Troubleshooting
Connection Refused
Problem: Client cannot connect to WebSocket endpoint
Solutions:
-
Verify backend WebSocket configuration:
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
} -
Check CORS settings:
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("http://localhost:3000")
.allowedMethods("GET", "POST");
} -
Verify firewall/proxy configuration
Messages Not Received
Problem: Client connected but not receiving messages
Solutions:
- Verify subscription topic matches backend destination
- Check message broker configuration
- Enable debug logging:
client.debug = (str) => {
console.log('STOMP Debug:', str);
};
High Latency
Problem: Messages arrive slowly or out of order
Solutions:
-
Reduce heartbeat interval (but not too low):
heartbeatIncoming: 2000,
heartbeatOutgoing: 2000 -
Enable message batching on backend
-
Use CDN or edge caching for WebSocket connections
-
Check network conditions (use
ping,traceroute)
Best Practices
- Always handle reconnection - Network issues are inevitable
- Debounce UI updates - Don't update on every message
- Use message batching - Reduce message frequency
- Implement backpressure - Prevent message queue overflow
- Log all errors - Essential for debugging
- Test with slow networks - Simulate poor conditions
- Monitor connection health - Track reconnection attempts
API Reference
STOMP Client Configuration
interface ClientConfig {
brokerURL: string;
reconnectDelay?: number;
heartbeatIncoming?: number;
heartbeatOutgoing?: number;
connectHeaders?: Record<string, string>;
debug?: (msg: string) => void;
onConnect?: (frame: Frame) => void;
onStompError?: (frame: Frame) => void;
onWebSocketError?: (event: Event) => void;
}
Message Types
type MessageType = 'event_log' | 'status_update' | 'state_update';
interface WebSocketMessage {
type: MessageType;
workflowId: string;
timestamp: string;
}
interface EventLogMessage extends WebSocketMessage {
type: 'event_log';
execModuleId: string;
eventLog: EventLog;
}
interface StatusUpdateMessage extends WebSocketMessage {
type: 'status_update';
execModuleId: string;
status: ModuleStatus;
duration: number;
}
interface StateUpdateMessage extends WebSocketMessage {
type: 'state_update';
state: Record<string, any>;
}