Namespace com.ibm.streamsx.iot.watson

Connectivity with IBM Watson IoT Platform.

IBM Watson IoT Platform

The IBM Watson Iot Platform service lets your IBM Streams applications communicate with and consume data collected by your connected devices, sensors, and gateways.

Recommended Deployment

Use of publish-subscribe is recommended to allow analytic applications to be developed and deployed independent of the connectivity to IBM Watson IoT Platform.

This is achieved by submitting the com.ibm.streamsx.iot.watson.apps::IotPlatform application to IBM Streams. This is the only job that connects to IBM Watson IoT platform, it publishes streams that contain device events, commands and status updates.

Analytic applications use EventsSubscribe, CommandsSubscribe and StatusesSubscribe to subscribe to streams to be analyzed.

Analytic applications use CommandPublish to send device commands to individual devices.

The benefits of this approach are:
  • Only a single application (IotPlatform) needs to know the authorization tokens.
  • Analytic applications can be added and removed at any time without disrupting existing running applications.

Applications may also connect to IBM Watson IoT Platform directly using operators DeviceEvents, DeviceCommands, DeviceStatuses and SendCommandToDevice. In this case the application must supply the authorization tokens to connect to IBM Watson IoT Platform, typically through submission time parameters.

Connectivity

Connectivty to IBM Watson IoT Platform is through MQTT.

By default operators and applications in this toolkit use encrypted connections through port 8883 with TLSv1.2. Operator parameter or submission time parameter encrypted can be set to false to use unencrypted connections through port 1883.

The Watson Iot Platform scalable application model is used when operators interacting with Watson IoT Platform are in a parallel region of width greater that one. This means MQTT messages representing device events, commands and statuses are load balanced across the multiple channels created by the parallel regions.

In addition if a connection breaks or the processing element fails messages are routed by Watson IoT Platform to the remaining active connections. When Streams reconnects or restarts the PE the new connection will be automaticallyy included in the load balancing and start receiving messages.

com.ibm.streamsx.iot.watson.apps::IotPlatform optionally supports the scalable application mode through submission time values that allow independent setting a scale factor for device events, commands and statuses.

Device Events

Devices publish device events that an SPL application can subscribe to using EventsSubscribe or DeviceEvents. The default is to subscribe to all event identifiers from all device types and ids. The operator is parameterized to allow only subscribing to a specific event id (which is really the type of the event), a specific device type and/or a specific device using its id.

The assumption is made that the format of the event is JSON and the resulting stream is of type DeviceEvent. The jsonString parameter contains the event payload in serialized JSON. The event payload is a JSON object using event format specification, so that the application event data is the value of the d key.

The application event data can be extracted into an SPL tuple using com.ibm.streamsx.iot::DeviceEventExtractData. This operator takes an input stream of type DeviceEvent and produces a stream of type DeviceEventTs augmented with a dataType tuple type specified by the application. Since the application event data might be specific to the type of the eventId or even the combination of typeId and eventId an application may need to use a Split operator between the DeviceEvents and multiple com.ibm.streamsx.iot::DeviceEventExtractData invocations. An example of its use is in the com.ibm.streamsx.iot.watson.quickstart::Quickstart application.

Device Status

Device status events, such as connect or disconnect, are subscribed to using StatusesSubscribe or DeviceStatuses which produce a stream of type DeviceStatus.

Device Commands

Devices can be sent commands, an SPL application can subscribe to commands being sent to devices using CommandsSubscribe or DeviceCommands. The default is to subscribe to all commands identifiers for all device types and ids. The operator is parameterized to allow only subscribing to a specific command id (which is really the type of the command), a specific device type and/or a specific device using its id (DeviceCommands only).

An SPL application can send a command to a specific device using CommandPublish or SendCommandToDevice with the input stream type of DeviceCmd. The jsonString attribute contains the application specific portion of the command represented as serialized JSON.

Applications can use CommandTupleToPayload to convert an SPL tuple representation of the command payload to JSON.

Connectivity using MQTT

A Streams application using this toolkit connects to IBM Watson IoT Platform as an MQTT application

Each invocation of an operator in this toolkit (e.g. DeviceEvents, DeviceStatuses) makes its own MQTT connection using a client identifier of a:org:appKey_appId. A Streams application must ensure that each operator invocation uses a client identifier that is unique for its org. Reuse of client identifiers, even across multiple applications, is not allowed and will result in unexpected disconnection of clients. The default appId ensures that the client identifiers will be unique for an instance if there is only a single invocation of each operator kind in an application. In this case the MQTT client identifier is a:org:appKey_jobId_xxx where xxx is specific to each operator kind.

By default the MQTT message hub host is: org.messaging.internetofthings.ibmcloud.com where org is the organization identifier. The domain parameter can be used with operators to set the host to org.messaging.domain .

JSON Message Format

This toolkit supports using JSON as the message format.

Operators

  • AllDevices: Composite that produces streams for all devices in an organization.
  • DeviceCommands: Subscribe to device commands for an IBM Watson IoT Platform organization.
  • DeviceEvents: Subscribe to device events for an IBM Watson IoT Platform organization.
  • DeviceStatuses: Subscribe to device status events for an IBM Watson IoT Platform organization.
  • PublishDeviceCommands: Publishes all device commands for an organization.
  • PublishDeviceEvents: Publishes all device events for an organization.
  • PublishDeviceStatuses: Publishes all device statuses for an organization.
  • SendCommandToDevice: Send commands to devices registered for an IBM Watson IoT Platform organization.
  • SubscribeDeviceCommands: Subscribes to device commands to be sent to devices for an organization.