We have many devices and applications sending messages to enterprise systems. The challenge sometimes is to aggregate (or collect) messages from different sources. The source of message could be sensor or sometimes a user interface given to administrator or even a web interface. A consumer of the messages, say a web service, might need a combination of these values to perform certain tasks. There are several ways to solve this problem of collection, here let's look at using WebSphere Message Broker (WMB) to achieve this.
Use case or an example
We'll take an example from the energy and utilities industry... consider a temperature sensor and a humidity sensor. These are 2 different devices that send the value read. Let's say that we have a web service that calculates the heat index, which is a combination of these values. You can read about heat index here. This requires temperature and humidity of the same location to be collected together. At the enterprise level the data mostly in the form of messages will be arriving from various sources and locations at the same time. The flows designed can't wait for all the values to arrive from one location before processing the next.
A short answer to the problem is to use the collector node of WebSphere Message Broker.
The temperature sensor could be sending the values via a some wired or wireless interface and protocol to a computer, it could even be MQTT. If data is incoming on some other protocol, it can be converted to MQTT message. On the server side's we need a WebSphere MQ installation with WebSphere MQ Telemetry installed on it. It needs to be configured to listen to a port (say 1883) for messages from MQTT clients. The configuration is described here. The values for the temperature and humidity arrive as two different messages and we'll use the WebSphere Message Broker collector node to collect and group these messages.
The message format and topics
In this example here, let's use the client ID, topic and the message body of MQTT message. The topic of the message can be of a convenient format that contains sensor type and the client identifier. For example "Device/temp/DV1" can represent a temperature sensor having the client identifier DV1. Its important to choose the right topic string while designing solutions as most of the routing decisions in the WebSphere Message Broker can be done based on this. The value measured by the sensor will be embed in the message body.
Configuration on the WebSphere MQ
WebSphere MQ can convert the incoming MQTT message to a JMS message that can be consumed a JMSInput node of WebSphere Message Broker. It can also be consumed using a MQInput node.. If using the JMSInput node we need to define the correct topic string in the JNDI bindings.
Go to the "JMS Administered Objects" in WebSphere MQ Explorer and create a initial context. Also create a connection factory say "cf" that can be used in the JMSInput node.
Now in the "Destinations" of the initial context create a 2 topics TEMP.TOPIC and HUMIDITY.TOPIC with topic strings "device/temp/+" and "device/humidity/+" respectively. Let's receive the messages in these 2 different topics.
Note down the binding file location so that it can be used in the JMSInput node of the WebSphere Message Broker tooling.
Configuration on the WebSphere Message Broker
The JMSInput node can be used to read the incoming messages. Let's set some properties of the message. In this example, let's say the the message body has the value. Since the MQTT messages are converted to JMS message here, the value will be a BLOB message.
SET RECEIVED_CLIENT_ID = SelectSubField(InputRoot.JMSTransport.Transport_Folders.Header_Values.JMSDestination,'/',5);
SET OutputRoot.JMSTransport.Transport_Folders.Application_Properties.RECV_CLIENT_ID = RECEIVED_CLIENT_ID;
SET OutputRoot.JMSTransport.Transport_Folders.Application_Properties.RECV_VALUE = CAST(InputRoot.BLOB.BLOB AS CHAR CCSID 1208);
The received client ID is important in the next step when we use the collector node and as you can see the value is BLOB message and can parsed as shown above.
WebSphere Message Broker also enable conversion to MQ message from a JMS message and vice-versa using the JMS MQ Transform nodes. Now connect these to the collector node as shown below
The collector node needs a minimum of 2 input terminals in this case. The collection definition plays a important role. The correlation path needs to be set to "$Root/MQRFH2/usr/RECV_CLIENT_ID". This implies that the collector node will wait of 2 messages from the same client but at different topics. So the collector node will wait for temperature value and then the humidity value also before forming a collection out of them. The timeout needs to be appropriately configured so that the messages don't pile up if one of the input devices go down and stops publishing messages.
If the collector node uses 2 queues "SYSTEM.BROKER.EDA.COLLECTIONS" and 'SYSTEM.BROKER.EDA.EVENTS". While testing your flow you might need to clear these queue if the messages start accumulating here. Appropriate timeout and a failure handling needs to be designed before deploying the flow having collector nodes.
The output of the collector node can be connect to a compute node where we can read the values of the collection like this:
DECLARE TEMPERATURE CHARACTER InputRoot.Collection.temp.MQRFH2.usr.RECV_VALUE;
DECLARE HUMIDITY CHARACTER InputRoot.Collection.humidity.MQRFH2.usr.RECV_VALUE;
You can read more about the collector node in the following links: