How-tos

Managing IoT devices with Kafka and MQTT

Share this post:

IBM Bluemix offers two complementary services by which applications can process events published by thousands of devices in real time:

  • The Internet of Things (IoT) Foundation service implements the functionality of an MQTT broker while offering a simple web administration interface. MQTT (Message Queue Telemetry Transport) is a light-weight publish/subscribe protocol supporting the efficient connection of devices to a server (broker). Applications can retrieve device data and coordinate activities performed on devices by sending command messages.
  • The Message Hub service is powered by Apache Kafka, a messaging system designed for high scalability in terms of data volumes and concurrent access by multiple applications that need to process the arriving data in real time.

Readers interested in more details can access the Bluemix documentation of IoT Foundation and Message Hub. The work described in this post was greatly helped by the article “Explore MQTT and the Internet of Things service on IBM Bluemix” that provided extremely useful background and examples for administering the Bluemix IoT service and for creating the client code by which devices and applications can interact with the MQTT broker.

Overview

This post will illustrate the implementation of a realistic mobility scenario where an application (mobility manager) coordinates the movement of many mobile devices in real time. The mobility manager subscribes to a Kafka topic from which it continuously retrieves incoming events produced by a configured set of devices. The application controls each device by publishing a Kafka message that is ultimately routed to the MQTT topic associated to that device. The mobility application produces commands of the following types:

  • poll instructs a device to publish its past movements up to a specified time
  • update requires a device to change its target destination and/or its speed.

The devices simulate vehicles (mobiles) interacting with an MQTT broker for communicating events and for receiving commands from the central manager. These devices produce events of the following types:

  • connect communicates a device’s connection to the broker, hence the ability to receive commands
  • step communicates the space-time coordinates of a device’s last movement toward its current target.

The above simple protocol is sufficient to demonstrate the exploitation of the open communication channel connecting devices to a central manager although managing devices capable of complex autonomous decisions might require a larger set of communication primitives.

At this point there is clearly a case for the MQTT-KAFKA BRIDGE connecting the IoT Foundation service (MQTT broker) and the Message Hub (Kafka) service. The role of this component in the end-to-end flow is depicted in the following diagram:

Flow diagram

The following sections provide details on each one of four sharp-cornered blocks in the above diagram.

MOBILES (DEVICES)

The MOBILES block represents a stand-alone Java process that simulates a configurable number of devices. The latter must have been configured to the IoT Foundation service and their authentication tokens saved on a file. In IoT Foundation, I defined the javaDevice device type and several devices of that type with device ID javadevice01, javadevice02, and so on. I saved authentication data in a file containing one line per device, each line including the device ID and its authorization token. This file is built inside the MOBILES runnable JAR. The code leverages the MQTT Java client provided by the Eclipse Paho project. The client JAR file can be downloaded here: org.eclipse.paho.client.mqttv3_1.0.2.jar.

One instance of the Paho MQTT client is created on start-up for each configured device. Connecting each client to the Bluemix MQTT broker requires to provide values for a few critical parameters in the appropriate format, as summarized in the following table:

Name Value
client ID “d:$organization:$deviceType:$deviceID”
userName “use-token-auth”
password Device authentication token

The organization part of the client ID can be retrieved dynamically at run time, as it appears in the VCAP_SERVICES environment variable entry associated to the IoT service instance bound to the running application. The other two parts (deviceType and deviceID) must be obtained elsewhere (in our case a value for deviceID is obtained from the authentication file, while the device type name is a built-in constant in our code). A device authentication token can be generated when a new device is registered to IoT using the service administration interface. The generated token is not kept in the service, so it must be saved for being used later during authentication.

Java runnable JAR in Bluemix

We choose to package the MOBILES application as a single runnable JAR that includes other JAR files. This is easily done in Eclipse using the Export function on a Java project that depends on other Java projects and external JAR files, as it was our case. The resulting application is a pure worker that only interacts with the external world through the MQTT broker, hence it needs no routes to be created in Bluemix. The application consists of a pure J2SE client (no server components) running a never ending process, hence it can run on the java_buildpack buildpack. The application can be created in Bluemix and then updated by executing the following Cloud Foundry command:

cf push mobiles -b java_buildpack -m 256M -p iic.mqtt.mobiles.jar --no-route --no-start

Once the application becomes visible in your Bluemix space you can bind services to it. In this case we must bind an instance of the IoT service to mobiles.

MQTT-KAFKA BRIDGE

Like MOBILES, the MQTT-KAFKA bridge also consists of a pure J2SE worker, in this case playing the client role with respect to both the MQTT and the KAFKA servers. During its initialization phase the bridge component:

  • subscribes to the MQTT topics receiving events from the configured set of devices, and
  • subscribes to the KAFKA topic receiving commands produced by one or more device management applications.

This time, the bridge component must identify and authenticate itself to the MQTT broker as an application client. Again, this requires to provide values for a few critical parameters in the appropriate format, as summarized in the following table:

Name Value
client ID “a:$organization:$applicationID”
userName $apiKey
password $apiToken

As for devices, the organization part of the client ID can be retrieved dynamically at run time from the VCAP_SERVICES environment variable, while an arbitrary unique name (e.g. application01) can be set for applicationID. In this case the apiKey and apiToken authentication strings must be any key-token pair generated by the Bluemix IoT service management interface:

Snapshot with 3 API keys

After initialization, execution proceeds in several concurrent threads:

  • on the MQTT side the MQTT callback processor associated to each device receives events and forwards them to the Kafka events topic, while
  • on the Kafka side one thread periodically pulls incoming messages from the Kafka commands topic and forwards the corresponding payload (device command) to the MQTT topic associated to its target device.

Like MOBILES, the MQTT-KAFKA BRIDGE component is packaged as a monolithic runnable JAR. The application can be created in Bluemix and then updated by executing the following Cloud Foundry command:

cf push mqtt-kafka-bridge -b java_buildpack -m 512M -p iic.mqtt.kafka.bridge.jar --no-route --no-start

Once the application becomes visible in your Bluemix space, you can bind services to it. In this case we must bind an instance of the IoT service to mobiles.

MOBILITY MANAGER

The mobility manager is an application running in the WebSphere Liberty Bluemix runtime. The JEE web side of the application exploits web sockets to support clients in visualizing the paths of mobiles over a geographical map. The application also implements access to the Message Hub Bluemix service via the Kafka native Java client and the mobility logic that drives the coordinated movement of mobiles. As in the bridge component, receiving messages from a Kafka topic requires a continuous polling by a dedicated thread. Spawning threads in a JEE container is seen by many as a capital sin, however in this proof-of-concept, we wanted to avoid the complexity of EJB timers. Therefore, we put the mobility manager logic and the associated Kafka in an object implementing the Java Runnable interface and let it be started by a static method on the opening of the first web socket connection.

Authentication via the Message Hub JAAS login module

Authentication of the Kafka Java client to the Message Hub Bluemix service is based on a custom JAAS login module as explained here. This process assumes credentials to be known and available on a file whose path must be declared by the java.security.auth.login.config Java property. However when running on Bluemix these data are preferably obtained from the VCAP_SERVICES variable at runtime, as they must always reflect credentials of the currently bound service instance. The following code may be run before invoking the Kafka client connection to achieve this goal: it fills a temporary file with credentials obtained at run time and sets the file path in the java.security.auth.login.config Java property.

   private static final String JAAS_TEMPLATE = 
    "KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName=\"kafka\" username=\"%1$s\" password=\"%2$s\"; };";  
    private static boolean JAAS_CONFIGURATION_DONE = false;

    private static void configureJAAS(String _serviceInstance, String _username, String _password) {
        
        // 1. Create a temporary file to hold the login credentials
        String content = String.format(JAAS_TEMPLATE, _username, _password);
        File jaasConf = null;
        PrintWriter writer = null;
        try {
            jaasConf = File.createTempFile("jaas", "conf");
            writer = new PrintWriter(jaasConf);
        } catch (IOException e) {
            e.printStackTrace();
        } 
        if (writer != null) {
            writer.println(content);
            writer.close();
            jaasConf.deleteOnExit();
        }
        
        // 2. Set java.security.auth.login.config property
        System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());
        
        // 3. Turn off further configuration attempts
        JAAS_CONFIGURATION_DONE = true;
    }

The above file-based JAAS configuration applies to J2SE Java programs, but also works as well on the Bluemix WebSphere Liberty profile server. Not surprisingly, it does not work on a stand-alone installation of WebSphere Liberty, where the configuration of data consumed by a JAAS login module must be provided in the Liberty server.xml configuration file. The Bluemix Liberty behavior is intentional, and it nicely fits the requirement of not including service credentials inside an application.

MOBILITY MANAGER UI

The following video illustrates the mobility manager UI. This component leverages OpenLayers software on the browser to display the position of mobiles over a geographical map:

The communication between the mobility manager and the browser takes place over web sockets: each new step event received from Kafka by the server application causes the latter to communicate the device ID and the step coordinates to all browser sessions currently open.

Conclusion

This article demonstrates the synergic exploitation of cloud services offered by the IBM Bluemix platform, namely the Internet of Things (IoT) Foundation and Message Hub. Under the hood, these services are powered by MQTT (IoT) and Apache Kafka (Message Hub). The proposed scenario includes a mobility management application connected to Kafka and mobile devices connected to the MQTT broker.

The mobility manager processes GPS location data published by the mobile devices, producing commands that coordinate their future movements. The scenario clearly makes the case for a generic component sitting between the MQTT broker and the Kafka server, automatically forwarding messages published on one end to the other end. This proof of concept demonstrates the feasibility of a generic bridge which can be implemented using standard Java client libraries for both MQTT and Kafka.

Add Comment
8 Comments

Leave a Reply

Your email address will not be published.Required fields are marked *


chintamani

Very useful document. Thanks a lot for this.
If I want to try this where can I get the mqtt-kafka bridge code or the jar file

Reply

Haryanto Rimbun

Hi,

I am trying to make my code working to try out to connect device using mqtt to bluemix. I am not so sure what I am missing. the connection does a time out. Do you know who can I contact to help me on this matter? Thanks

Reply

    Haryanto

    Thanks for the advice. It is working. Need to do some tweak on the TCP and SSL address.

    Reply

jparm

Why kafka? why not mqtt only?

Reply

    Rajeev

    @jdparm, Kakfa for scalable and distributed architecture.

    Reply

Anas

Very useful document. Where I can find the MQTT Kafka multi-directional code mentioned here?

Reply

Jon

Was wondering why you need a dedicated MQTT broker before the Kafka broker? Why not just have a proxy application that parses MQTT payload and publishes it directly on Kafka topics? Does having a dedicated MQTT broker help in any way?

Reply

LV

What is the need for a dedicated MQTT broker before Kafka? Why not just implement a pure MQTT to Kafka bridge that just parses incoming MQTT payloads and publishes on Kafka topics? Does a dedicated MQTT broker bring any added value in terms of QoS?

Reply
More Trending Stories

Using SSH tunnels and Selenium to test web applications on a continuous delivery pipeline

Developers often have a need to test their web applications. In particular they often have a need to automate these tests as part of a continuous integration (CI) pipeline. One such tool that helps facilitate this test requirement is Selenium. Selenium is a piece of software which is designed to automate browser behaviour, in that you can program it to visit a particular web page and then perform a series of actions on that web page. Most often this is leveraged to test web applications, although its functionality is not limited to that single use case. With a default configuration, however, this isn’t possible as the Selenium Server has no way of reaching an application that has been started within a CI container.

Continue reading

Integrate and Analyze Diagnostic Logs with IBM Cloud Log Analysis

Analyzing diagnostic logs, monitoring application health and keeping track of security-related events are at the foundation of successfully running apps and services. IBM Cloud offers services for that purpose. Today, I am going to show you how to use IBM Cloud Log Analysis to integrate, search and analyze as well as visualize diagnostic logs in the IBM Cloud.

Continue reading

Obey your commands: Home automation using Watson and PubNub

Integration of voice control in smart devices is buzzing, and adoption continues to grow. Voice control provides a more natural way of interacting with connected apps and devices ranging from news feeds, traffic information to acting as personal assistants in the home. These intelligent devices respond to commands spoken in our own voice and act immediately.

Continue reading