Skip to content

Tutorials

Realtime API Tutorial: Subscribing to Alarm Messages

This tutorial guides you through the process of connecting to the Realtime API and subscribing to alarm messages using both JavaScript and Spring Boot clients. Connection Flow

The following diagram illustrates the basic connection and subscription flow:


%%{init: {'theme': 'dark'}}%%
sequenceDiagram
    participant Client
    participant Auth as Auth Service
    participant WebSocket as WebSocket Server

    rect rgba(40, 44, 52, 0.6)
        Note over Client, Auth: Authentication
        Client->>Auth: Request Bearer Token
        Auth-->>Client: Return Bearer Token
    end

    rect rgba(40, 44, 52, 0.6)
        Note over Client, WebSocket: WebSocket Connection
        Client->>WebSocket: Connect with Bearer Token
        WebSocket-->>Client: Connection Established
    end

    rect rgba(40, 44, 52, 0.6)
        Note over Client, WebSocket: Subscription
        Client->>WebSocket: Subscribe to /queue/{username}/alarms
        WebSocket-->>Client: Subscription Confirmed
    end

    rect rgba(40, 44, 52, 0.6)
        Note over Client, WebSocket: Persistent Connection & Streaming
        Note right of Client: Connection remains open
        loop Message Streaming
            WebSocket->>Client: Alarm Message 1
            Note right of Client: Process message
            WebSocket->>Client: Alarm Message 2
            Note right of Client: Process message
            WebSocket->>Client: Measurement Message
            Note right of Client: Process message
            Note right of Client: ...continues indefinitely...
        end
    end

    rect rgba(40, 44, 52, 0.6)
        Note over Client, WebSocket: Reconnection (if needed)
        Note right of Client: Connection Lost
        Client->>WebSocket: Reconnect with Bearer Token
        WebSocket-->>Client: Connection Re-established
        Note right of Client: Buffered messages delivered
        Note right of Client: Resume streaming
    end

    rect rgba(40, 44, 52, 0.6)
        Note over Client, WebSocket: Explicit Closure
        Client->>WebSocket: Close Connection Request
        WebSocket-->>Client: Connection Closed
        Note right of Client: Application terminates or user logs out
    end

Subscription Channels

The Realtime API provides two main subscription channels:

  • Alarm Channel: /queue/{username}/alarms
  • Measurement Channel: /queue/{username}/measurements

Replace {username} with your actual username used during authentication. JavaScript Client Example

This example uses the RxStomp library to connect to the WebSocket server and subscribe to the alarm channel:

import {RxStomp, StompHeaders} from "@stomp/rx-stomp";
import { WebSocket } from 'ws';

Object.assign(global, { WebSocket});

const token = ""; // replace with your token

const rxStomp = new RxStomp();

rxStomp.configure({
    brokerURL: 'wss://tds-real-time-api.eu.i.savr.saveris.net/web-socket',
    debug: console.log.bind(console),
    connectHeaders: {Authorization: `Bearer ${token}`}
});

rxStomp.activate();

let subHeaders = new StompHeaders();

const subscription = rxStomp.watch({ 
    destination: "/queue/<username>/alarms", 
    subHeaders 
}).subscribe((message) => {console.log(message.body)});

Tip

To subscribe to measurement data instead, change the destination to /queue/your-username/measurements.

Spring Boot Client Example

spring boot client This client demonstrates a full implementation of a WebSocket connection to the Realtime API, including authentication, subscription, and reconnection logic.

Using the Spring Boot Client:

  1. Import the project into your IDE (e.g., IntelliJ IDEA)
  2. Dependencies in build.gradle should be automatically resolved
  3. Create a Spring Boot configuration with JDK 17 and set the main class to com.example.demo.WebsocketApplication
  4. Configure the following environment variables or add them to the application.yml file:
TESTO_COGNITO_CLIENT_NAME=your-username
TESTO_COGNITO_CLIENT_SECRET=your-password
TESTO_COGNITO_CLIENT_ID=your-client-id

Run the application

Note

Each region and environment has its own client ID. For example, the Europe integration environment uses "2r2u2bl029vu8pk65hanr238dl".

Key Components of the Spring Boot Client:

The following code snippet shows the most important class of the example client, the StompAlarmSessionHandler. This class manages the WebSocket connection and subscription:

package com.example.demo.websocket;

import com.example.demo.authentication.AuthTokenService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompConversionException;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.util.UriComponentsBuilder;

import java.lang.reflect.Type;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.TimeUnit;

@Component
public class StompAlarmSessionHandler implements StompSessionHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(StompAlarmSessionHandler.class);

    private static final String ALARM_TOPIC = "/queue/{username}/alarms";
    private static final long WAIT_SECONDS_UNTIL_RETRY = 5;

    private final String username;
    private final String webSocketBaseUrl;
    private final WebSocketStompClient webSocketStompClient;
    private final AuthTokenService authTokenService;


    public StompAlarmSessionHandler(@Value("${spring.security.oauth2.client.registration.cognito.client-name}") String username,
                                    @Value("${testo.ws.url}") String webSocketBaseUrl,
                                    WebSocketStompClient webSocketStompClient,
                                    AuthTokenService authTokenService) {
        this.username = username;
        this.webSocketStompClient = webSocketStompClient;
        this.webSocketBaseUrl = webSocketBaseUrl;
        this.authTokenService = authTokenService;
    }

    @EventListener(classes = {ApplicationReadyEvent.class})
    public void handleApplicationReadyEvent() {
        LOGGER.info("Connect on ready event");
        connectWebSocket();
    }

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        String alarmTopic = UriComponentsBuilder.fromUriString(ALARM_TOPIC).build(username).toString();
        session.subscribe(UriComponentsBuilder.fromUriString(alarmTopic).build(username).toString(), this); // TODO PP uri builder twice?
        LOGGER.info("Stomp alarm session subscribed to topic: {}", alarmTopic);
    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        LOGGER.error("handle exception, session: {}, command: {}, headers: {}, payload: {}", session, command, headers, payload, exception);
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        LOGGER.warn("handle transport error for sessionID: {} connected: {}", session.getSessionId(), session.isConnected());
        if (exception instanceof StompConversionException) {
            LOGGER.error("Stomp topic not reachable: {}", ALARM_TOPIC, exception);
        }
        if (!session.isConnected()) {
            reestablishConnection();
        } else {
            LOGGER.error("Unknown Exception {}", exception.getMessage(), exception);
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        LOGGER.info("handle frame: {}", payload);
    }

    private void connectWebSocket() {
        webSocketStompClient.connectAsync(webSocketBaseUrl, new WebSocketHttpHeaders(), stompHeaders(authTokenService.getAccessToken()), this);
    }

    private StompHeaders stompHeaders(String token) {
        StompHeaders stompHeaders = new StompHeaders();
        stompHeaders.set("Authorization", String.format("Bearer %s", token));
        return stompHeaders;
    }

    private void reestablishConnection() {
        try {
            TimeUnit.SECONDS.sleep(WAIT_SECONDS_UNTIL_RETRY);
        } catch (InterruptedException e) {
            LOGGER.error("Thread sleep error", e);
            Thread.currentThread().interrupt();
        }
        try {
            LOGGER.info("Reconnect");
            connectWebSocket();
            LOGGER.info("Connected");
        } catch (Exception e) {
            if (e.getCause() instanceof UnresolvedAddressException) {
                LOGGER.error("Failed to reconnect");
            } else {
                LOGGER.error("Exception", e);
            }
        }
    }

}

Example Output

When successfully connected, you'll receive alarm messages in this format:

INFO [demo,,] 30104 --- [ient-SecureIO-2] c.e.d.w.StompAlarmSessionHandler:
 handle frame:
{
    "uuid":"11111111-xxxx-xxxx-xxxx-111111111111",
    "tenant_uuid":"11111111-xxxx-xxxx-xxxx-111111111111",
    "alarm_reason":"Alarm condition is violated",
    "alarm_status":"Alarm",
    "last_status_change_time":"2023-08-01T11:20:00Z",
    "alarm_condition_type":"Upper limit",
    "alarm_severity":"Alarm",
    "alarm_time":"2023-08-01T11:20:00Z",
    "alarm_time_local":"2023-08-01T13:20:00+02:00",
    "alarm_value":"25.2",
    "physical_unit":"°C",
    "physical_value":"Temperature",
    "physical_value_extension":"Air Temperature",
    "alarm_source_uuid":"11111111-xxxx-xxxx-xxxx-111111111111",
    "alarm_type":"measurement alarm",
    "processed_at":"2023-08-01T11:21:25Z"
}

Message Types

The Realtime API provides two types of messages: 1. Alarm Messages

Alarm messages are sent when conditions configured in your alarm settings are triggered. The alarm_type field indicates the type of alarm:

  • measurement_alarm: Triggered when a measurement exceeds configured thresholds
  • sensor_system_alarm: Indicates issues with a sensor
  • device_system_alarm: Indicates issues with a device

The alarm_source_uuid field contains the UUID of the sensor or device that triggered the alarm. 2. Measurement Messages

Measurement messages contain real-time measurement data from your devices and sensors. These messages include:

  • Unique identifiers for the measurement, device, and sensor
  • Measurement value and timestamp
  • Physical property name and unit
  • Additional metadata

Complete Message Structure

For the detailed structure of all message types, please refer to the API's AsyncAPI document in the API Reference section.