Tutorials
Realtime API Tutorial: Subscribing to Alarm Messages
This tutorial guides you through the process of connecting to the Realtime API and subscribing to alarm messages using both JavaScript and Spring Boot clients. Connection Flow
The following diagram illustrates the basic connection and subscription flow:
%%{init: {'theme': 'dark'}}%%
sequenceDiagram
participant Client
participant Auth as Auth Service
participant WebSocket as WebSocket Server
rect rgba(40, 44, 52, 0.6)
Note over Client, Auth: Authentication
Client->>Auth: Request Bearer Token
Auth-->>Client: Return Bearer Token
end
rect rgba(40, 44, 52, 0.6)
Note over Client, WebSocket: WebSocket Connection
Client->>WebSocket: Connect with Bearer Token
WebSocket-->>Client: Connection Established
end
rect rgba(40, 44, 52, 0.6)
Note over Client, WebSocket: Subscription
Client->>WebSocket: Subscribe to /queue/{username}/alarms
WebSocket-->>Client: Subscription Confirmed
end
rect rgba(40, 44, 52, 0.6)
Note over Client, WebSocket: Persistent Connection & Streaming
Note right of Client: Connection remains open
loop Message Streaming
WebSocket->>Client: Alarm Message 1
Note right of Client: Process message
WebSocket->>Client: Alarm Message 2
Note right of Client: Process message
WebSocket->>Client: Measurement Message
Note right of Client: Process message
Note right of Client: ...continues indefinitely...
end
end
rect rgba(40, 44, 52, 0.6)
Note over Client, WebSocket: Reconnection (if needed)
Note right of Client: Connection Lost
Client->>WebSocket: Reconnect with Bearer Token
WebSocket-->>Client: Connection Re-established
Note right of Client: Buffered messages delivered
Note right of Client: Resume streaming
end
rect rgba(40, 44, 52, 0.6)
Note over Client, WebSocket: Explicit Closure
Client->>WebSocket: Close Connection Request
WebSocket-->>Client: Connection Closed
Note right of Client: Application terminates or user logs out
end
Subscription Channels
The Realtime API provides two main subscription channels:
- Alarm Channel: /queue/{username}/alarms
- Measurement Channel: /queue/{username}/measurements
Replace {username}
with your actual username used during authentication.
JavaScript Client Example
This example uses the RxStomp library to connect to the WebSocket server and subscribe to the alarm channel:
import {RxStomp, StompHeaders} from "@stomp/rx-stomp";
import { WebSocket } from 'ws';
Object.assign(global, { WebSocket});
const token = ""; // replace with your token
const rxStomp = new RxStomp();
rxStomp.configure({
brokerURL: 'wss://tds-real-time-api.eu.i.savr.saveris.net/web-socket',
debug: console.log.bind(console),
connectHeaders: {Authorization: `Bearer ${token}`}
});
rxStomp.activate();
let subHeaders = new StompHeaders();
const subscription = rxStomp.watch({
destination: "/queue/<username>/alarms",
subHeaders
}).subscribe((message) => {console.log(message.body)});
Tip
To subscribe to measurement data instead, change the destination to /queue/your-username/measurements.
Spring Boot Client Example
spring boot client This client demonstrates a full implementation of a WebSocket connection to the Realtime API, including authentication, subscription, and reconnection logic.
Using the Spring Boot Client:
- Import the project into your IDE (e.g., IntelliJ IDEA)
- Dependencies in build.gradle should be automatically resolved
- Create a Spring Boot configuration with JDK 17 and set the main class to
com.example.demo.WebsocketApplication
- Configure the following environment variables or add them to the application.yml file:
TESTO_COGNITO_CLIENT_NAME=your-username
TESTO_COGNITO_CLIENT_SECRET=your-password
TESTO_COGNITO_CLIENT_ID=your-client-id
Run the application
Note
Each region and environment has its own client ID. For example, the Europe integration environment uses "2r2u2bl029vu8pk65hanr238dl".
Key Components of the Spring Boot Client:
The following code snippet shows the most important class of the example client, the StompAlarmSessionHandler. This class manages the WebSocket connection and subscription:
package com.example.demo.websocket;
import com.example.demo.authentication.AuthTokenService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompConversionException;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.util.UriComponentsBuilder;
import java.lang.reflect.Type;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.TimeUnit;
@Component
public class StompAlarmSessionHandler implements StompSessionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(StompAlarmSessionHandler.class);
private static final String ALARM_TOPIC = "/queue/{username}/alarms";
private static final long WAIT_SECONDS_UNTIL_RETRY = 5;
private final String username;
private final String webSocketBaseUrl;
private final WebSocketStompClient webSocketStompClient;
private final AuthTokenService authTokenService;
public StompAlarmSessionHandler(@Value("${spring.security.oauth2.client.registration.cognito.client-name}") String username,
@Value("${testo.ws.url}") String webSocketBaseUrl,
WebSocketStompClient webSocketStompClient,
AuthTokenService authTokenService) {
this.username = username;
this.webSocketStompClient = webSocketStompClient;
this.webSocketBaseUrl = webSocketBaseUrl;
this.authTokenService = authTokenService;
}
@EventListener(classes = {ApplicationReadyEvent.class})
public void handleApplicationReadyEvent() {
LOGGER.info("Connect on ready event");
connectWebSocket();
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
String alarmTopic = UriComponentsBuilder.fromUriString(ALARM_TOPIC).build(username).toString();
session.subscribe(UriComponentsBuilder.fromUriString(alarmTopic).build(username).toString(), this); // TODO PP uri builder twice?
LOGGER.info("Stomp alarm session subscribed to topic: {}", alarmTopic);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
LOGGER.error("handle exception, session: {}, command: {}, headers: {}, payload: {}", session, command, headers, payload, exception);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
LOGGER.warn("handle transport error for sessionID: {} connected: {}", session.getSessionId(), session.isConnected());
if (exception instanceof StompConversionException) {
LOGGER.error("Stomp topic not reachable: {}", ALARM_TOPIC, exception);
}
if (!session.isConnected()) {
reestablishConnection();
} else {
LOGGER.error("Unknown Exception {}", exception.getMessage(), exception);
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
LOGGER.info("handle frame: {}", payload);
}
private void connectWebSocket() {
webSocketStompClient.connectAsync(webSocketBaseUrl, new WebSocketHttpHeaders(), stompHeaders(authTokenService.getAccessToken()), this);
}
private StompHeaders stompHeaders(String token) {
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.set("Authorization", String.format("Bearer %s", token));
return stompHeaders;
}
private void reestablishConnection() {
try {
TimeUnit.SECONDS.sleep(WAIT_SECONDS_UNTIL_RETRY);
} catch (InterruptedException e) {
LOGGER.error("Thread sleep error", e);
Thread.currentThread().interrupt();
}
try {
LOGGER.info("Reconnect");
connectWebSocket();
LOGGER.info("Connected");
} catch (Exception e) {
if (e.getCause() instanceof UnresolvedAddressException) {
LOGGER.error("Failed to reconnect");
} else {
LOGGER.error("Exception", e);
}
}
}
}
Example Output
When successfully connected, you'll receive alarm messages in this format:
INFO [demo,,] 30104 --- [ient-SecureIO-2] c.e.d.w.StompAlarmSessionHandler:
handle frame:
{
"uuid":"11111111-xxxx-xxxx-xxxx-111111111111",
"tenant_uuid":"11111111-xxxx-xxxx-xxxx-111111111111",
"alarm_reason":"Alarm condition is violated",
"alarm_status":"Alarm",
"last_status_change_time":"2023-08-01T11:20:00Z",
"alarm_condition_type":"Upper limit",
"alarm_severity":"Alarm",
"alarm_time":"2023-08-01T11:20:00Z",
"alarm_time_local":"2023-08-01T13:20:00+02:00",
"alarm_value":"25.2",
"physical_unit":"°C",
"physical_value":"Temperature",
"physical_value_extension":"Air Temperature",
"alarm_source_uuid":"11111111-xxxx-xxxx-xxxx-111111111111",
"alarm_type":"measurement alarm",
"processed_at":"2023-08-01T11:21:25Z"
}
Message Types
The Realtime API provides two types of messages: 1. Alarm Messages
Alarm messages are sent when conditions configured in your alarm settings are triggered. The alarm_type
field indicates the type of alarm:
measurement_alarm
: Triggered when a measurement exceeds configured thresholdssensor_system_alarm
: Indicates issues with a sensordevice_system_alarm
: Indicates issues with a device
The alarm_source_uuid field contains the UUID of the sensor or device that triggered the alarm. 2. Measurement Messages
Measurement messages contain real-time measurement data from your devices and sensors. These messages include:
- Unique identifiers for the measurement, device, and sensor
- Measurement value and timestamp
- Physical property name and unit
- Additional metadata
Complete Message Structure
For the detailed structure of all message types, please refer to the API's AsyncAPI document in the API Reference section.