Skip to main content

Workflow Monitor WebSocket Protocol

Real-time communication protocol for workflow execution monitoring using STOMP over WebSocket.


Overview

The Workflow Monitor uses STOMP (Simple Text Oriented Messaging Protocol) over WebSocket for bidirectional, real-time communication between the frontend and backend.

Connection Details

  • Protocol: STOMP 1.2
  • Transport: WebSocket with SockJS fallback
  • Endpoint: /ws
  • Message Format: JSON
  • Heartbeat: 4000ms (both incoming/outgoing)

Connection Lifecycle

1. Client Connection

import { Client } from '@stomp/stompjs';

const client = new Client({
brokerURL: 'ws://localhost:8080/ws',
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});

client.onConnect = (frame) => {
console.log('Connected:', frame);
};

client.activate();

2. Subscription

client.subscribe(`/topic/workflows/${workflowId}/events`, (message) => {
const data = JSON.parse(message.body);
handleMessage(data);
});

3. Disconnection

client.deactivate();

Message Types

Event Log Message

Sent when a module logs an event during execution.

Topic: /topic/workflows/{workflowId}/events

Payload:

{
"type": "event_log",
"workflowId": "550e8400-e29b-41d4-a716-446655440000",
"execModuleId": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"eventLog": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "ok",
"eventDetails": "Successfully processed 100 records",
"createdDate": "2025-10-06T18:15:30.123Z",
"ownerId": "user-123"
}
}

Fields:

  • type (string): Always "event_log"
  • workflowId (UUID): Workflow identifier
  • execModuleId (UUID): Module identifier
  • eventLog (object): ThorAPI EventLog model
    • id (UUID): Event log ID
    • status ("ok" | "error" | "disabled"): Event status
    • eventDetails (string): Event message
    • createdDate (ISO8601): Timestamp
    • ownerId (string, optional): Owner/creator ID

Example Handler:

if (data.type === 'event_log') {
setLiveEvents(prev => ({
...prev,
[data.execModuleId]: [
...(prev[data.execModuleId] || []).slice(-4),
data.eventLog
]
}));
}

Status Update Message

Sent when a module's execution status changes.

Topic: /topic/workflows/{workflowId}/events

Payload:

{
"type": "status_update",
"workflowId": "550e8400-e29b-41d4-a716-446655440000",
"execModuleId": "7c9e6679-7425-40de-944b-e07fc1f90ae7",
"status": "RUNNING",
"duration": 1500,
"timestamp": "2025-10-06T18:15:30.123Z"
}

Fields:

  • type (string): Always "status_update"
  • workflowId (UUID): Workflow identifier
  • execModuleId (UUID): Module identifier
  • status (enum): Module execution status
    • PENDING - Not yet started
    • RUNNING - Currently executing
    • SUCCESS - Completed successfully
    • ERROR - Failed with error
    • PAUSED - Execution paused
    • SKIPPED - Module skipped
  • duration (number): Execution time in milliseconds
  • timestamp (ISO8601): When status changed

Example Handler:

if (data.type === 'status_update') {
setLiveStatus(prev => ({
...prev,
[data.execModuleId]: data.status
}));

setDurations(prev => ({
...prev,
[data.execModuleId]: data.duration
}));
}

Workflow State Update Message

Sent when workflow state variables change.

Topic: /topic/workflows/{workflowId}/events

Payload:

{
"type": "state_update",
"workflowId": "550e8400-e29b-41d4-a716-446655440000",
"state": {
"build.version": "1.0.5",
"deploy.target": "production",
"ig.publish.status": "completed",
"ig.publish.url": "https://api.example.com/fhir/ImplementationGuide/123"
},
"timestamp": "2025-10-06T18:15:30.123Z"
}

Fields:

  • type (string): Always "state_update"
  • workflowId (UUID): Workflow identifier
  • state (object): Key-value pairs of state variables
    • Keys follow dot notation for namespacing
    • Values can be any JSON-serializable type
  • timestamp (ISO8601): When state changed

Example Handler:

if (data.type === 'state_update') {
setWorkflowState(prev => ({
...prev,
...data.state
}));
}

Error Handling

Connection Errors

client.onStompError = (frame) => {
console.error('STOMP error:', frame.headers['message']);
setError(new Error(frame.headers['message']));
};

WebSocket Errors

client.onWebSocketError = (event) => {
console.error('WebSocket error:', event);
};

Reconnection Strategy

client.reconnectDelay = 5000; // 5 seconds
client.maxReconnectDelay = 30000; // Max 30 seconds

// Exponential backoff is handled automatically by @stomp/stompjs

Security

Authentication

WebSocket connections should be authenticated using the same mechanism as HTTP requests:

const client = new Client({
brokerURL: 'ws://localhost:8080/ws',
connectHeaders: {
Authorization: `Bearer ${accessToken}`
}
});

Authorization

Backend validates subscription requests:

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
UUID workflowId = extractWorkflowId(destination);

// Verify user has access to this workflow
if (!workflowService.hasAccess(getCurrentUser(), workflowId)) {
throw new AccessDeniedException("Unauthorized");
}
}

return message;
}
});
}

Performance Optimization

Message Batching

Backend batches events to reduce message frequency:

public class WorkflowWebSocketHandler {
private final Map<String, List<EventLog>> pendingEvents = new ConcurrentHashMap<>();

@Scheduled(fixedRate = 100) // Send every 100ms
public void flushPendingEvents() {
pendingEvents.forEach((workflowId, events) -> {
if (!events.isEmpty()) {
broadcastEventLogBatch(workflowId, events);
events.clear();
}
});
}
}

Client-side Debouncing

const debouncedUpdate = useMemo(
() => debounce((data) => {
handleStatusUpdate(data);
}, 100),
[]
);

Backpressure Handling

@Override
public void configureBrokerChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.queueCapacity(1000);
}

Testing

Unit Tests

import { Client } from '@stomp/stompjs';
import { MockWebSocket } from 'mock-socket';

describe('useWorkflowWebSocket', () => {
it('connects and receives messages', async () => {
const mockServer = new MockWebSocket('ws://localhost:8080/ws');

mockServer.on('connection', (socket) => {
socket.send(JSON.stringify({
type: 'status_update',
execModuleId: 'test-123',
status: 'RUNNING'
}));
});

const { result } = renderHook(() => useWorkflowWebSocket('workflow-123'));

await waitFor(() => {
expect(result.current.connected).toBe(true);
});

await waitFor(() => {
expect(result.current.liveStatus['test-123']).toBe('RUNNING');
});
});
});

Integration Tests

# Install wscat for manual testing
npm install -g wscat

# Connect to WebSocket
wscat -c ws://localhost:8080/ws

# Send STOMP CONNECT frame
CONNECT
accept-version:1.2
heart-beat:4000,4000

^@

# Subscribe to workflow events
SUBSCRIBE
id:sub-1
destination:/topic/workflows/550e8400-e29b-41d4-a716-446655440000/events

^@

Troubleshooting

Connection Refused

Problem: Client cannot connect to WebSocket endpoint

Solutions:

  1. Verify backend WebSocket configuration:

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws")
    .setAllowedOriginPatterns("*")
    .withSockJS();
    }
  2. Check CORS settings:

    @Override
    public void addCorsMappings(CorsRegistry registry) {
    registry.addMapping("/**")
    .allowedOrigins("http://localhost:3000")
    .allowedMethods("GET", "POST");
    }
  3. Verify firewall/proxy configuration


Messages Not Received

Problem: Client connected but not receiving messages

Solutions:

  1. Verify subscription topic matches backend destination
  2. Check message broker configuration
  3. Enable debug logging:
    client.debug = (str) => {
    console.log('STOMP Debug:', str);
    };

High Latency

Problem: Messages arrive slowly or out of order

Solutions:

  1. Reduce heartbeat interval (but not too low):

    heartbeatIncoming: 2000,
    heartbeatOutgoing: 2000
  2. Enable message batching on backend

  3. Use CDN or edge caching for WebSocket connections

  4. Check network conditions (use ping, traceroute)


Best Practices

  1. Always handle reconnection - Network issues are inevitable
  2. Debounce UI updates - Don't update on every message
  3. Use message batching - Reduce message frequency
  4. Implement backpressure - Prevent message queue overflow
  5. Log all errors - Essential for debugging
  6. Test with slow networks - Simulate poor conditions
  7. Monitor connection health - Track reconnection attempts

API Reference

STOMP Client Configuration

interface ClientConfig {
brokerURL: string;
reconnectDelay?: number;
heartbeatIncoming?: number;
heartbeatOutgoing?: number;
connectHeaders?: Record<string, string>;
debug?: (msg: string) => void;
onConnect?: (frame: Frame) => void;
onStompError?: (frame: Frame) => void;
onWebSocketError?: (event: Event) => void;
}

Message Types

type MessageType = 'event_log' | 'status_update' | 'state_update';

interface WebSocketMessage {
type: MessageType;
workflowId: string;
timestamp: string;
}

interface EventLogMessage extends WebSocketMessage {
type: 'event_log';
execModuleId: string;
eventLog: EventLog;
}

interface StatusUpdateMessage extends WebSocketMessage {
type: 'status_update';
execModuleId: string;
status: ModuleStatus;
duration: number;
}

interface StateUpdateMessage extends WebSocketMessage {
type: 'state_update';
state: Record<string, any>;
}