Contents


Detect complex events in a real-time data stream

Build an event-detection app easily with IBM Bluemix and Streaming Analytics

Comments

This article was written using the Bluemix classic interface. Given the rapid evolution of technology, some steps and illustrations may have changed.

Do you want to perform complex event detection on information from real-time data sources and act quickly when events are found? It's easier than you might think when you use IBM® Bluemix® and the Streaming Analytics service. To show how easy, I built a starter app that:

  • Uses the Streaming Analytics service within a Bluemix Node.js app
  • Ingests a stream of data into Streaming Analytics
  • Recognizes patterns and detects events in the data stream
  • Sends the results of the analysis to the Bluemix app

This tutorial explains how to obtain, run, and extend this starter app, called EventDetection. Here's a graphical overview of the solution components:

Overview diagram of the event-detection solution
Overview diagram of the event-detection solution

The EventDetection app is implemented via the SDK for Node.js runtime. The app provides a simple web UI to display status and results of the analysis. The Node.js app is bound to an instance of the Streaming Analytics service. The app controls the service via the Streaming Analytics REST API.

The analytics are performed by an IBM Streams application that implements the event detection against a stream of weather data. A Streams application bundle is submitted by the Node.js app to deploy the Streams application to the Streaming Analytics instance that the app is bound to.

The data being analyzed is weather data available on the public Internet from the National Oceanic and Atmospheric Administration (NOAA). After the Streams application is deployed into the Streaming Analytics instance, it continuously ingests and analyzes the weather data until the Streams application is stopped.

When the Streams application detects events, they're sent to the Node.js app for display on the app's web UI.

You can modify the starter application's source code to customize it or extend it in any of several interesting ways.

What you'll need to build your application

Run the appGet the code

Step 1. Create a Bluemix app and bind the Streaming Analytics service

  1. Log in to Bluemix. In the dashboard, click the CREATE APP button. Choose WEB when asked which kind of app to create.
  2. Select SDK for Node.js from the available runtimes in the catalog and click CONTINUE. Enter a name of your liking and click FINISH. Wait for your app to finish staging.
  3. Return to the Dashboard and click the application name. On the application overview page, click the ADD A SERVICE OR API button.
  4. Click the Streaming Analytics service in the catalog's Data and Analytics category and click CREATE to bind it to your application. Restage the app when prompted.

Step 2. Obtain your own copy of the source code

The code for the EventDetection application is stored in the streamscloud | EventDetection project on Bluemix DevOps Services. To obtain your own copy, click the Get the code button on this page (at the end of the What you'll need to build your application section) and either clone the Git repository or fork the code to your own Bluemix DevOps Services project. If you're unfamiliar with either of those methods, download a ZIP file of the source code:

  1. Click the download icon: Screenshot of the download icon in the author's Bluemix DevOps Services project
    Screenshot of the download icon in the author's Bluemix DevOps Services project
  2. Save the ZIP file locally and extract its contents.
  3. Rename the directory that contains the extracted files to match the name of the app you created in Step 1.

Step 3. Deploy your app and view it

Now you'll deploy the starter application as-is, without any code changes, and then explore the UI. Later in the tutorial, you'll have a chance to customize the application and redeploy it.

  1. From your OS command line, change to the directory that contains your extracted application.
  2. Connect to the Bluemix instance of Cloud Foundry:

    cf api https://api.ng.bluemix.net

    cf login

  3. Deploy your app:

    cf push yourapp

  4. In Bluemix, click your application's route to open the app. You see a basic web page with the title Welcome to the Event Detection Sample Application!: Screenshot of the running EventDetection app
    Screenshot of the running EventDetection app

    The Application Flow section lists the steps that are being performed by the application and their status.

    The Event Types section defines the types of events the application is detecting.

    The Application Results section displays the events as they are detected. It also displays the highest and lowest temperature currently reported.

Step 4. Explore the running Streams application

  1. Back in Bluemix, bring up the service dashboard for your Streaming Analytics service. You can access the dashboard in multiple ways, including by clicking the Streaming Analytics icon in your application: Screenshot of the Streaming Analytics service icon
    The Streaming Analytics dashboard, shown here, offers tasks to control your instance and links to relevant information: Screenshot of the Streaming Analytics service icon
    Screenshot of the Streaming Analytics service icon

    You can also use the dashboard to launch the Streaming Analytics console, which shows information about your Streaming Analytics instance.
  2. Click the LAUNCH button on the dashboard to display the Streaming Analytics console. In this screenshot, the console shows one job running — the Streams application that's performing the complex event detection: Screenshot of the Streaming Analystics console
    Screenshot of the Streaming Analystics console
  3. Maximize the Streams Graph pane in the upper right of the console to show the flow graph of the Streams application. Use your browser's zoom controls to get a view of the entire graph. The graph shows the live status of your Streams application as it runs. You can hover over the operators in the graph or the connections between them to get more-detailed information. In the following screenshot, hovering over a connection in the graph displays information such as the number of tuples that have flowed between the first two operators in the graph: Screenshot of a connection in the Streams Graph
    Screenshot of a connection in the Streams Graph

Step 5. Review the Node.js code

The EventDetection app is a complete yet simple application that requires no customization to run. To understand the app, examine its code:

  1. Open the app.js file to view the application logic. The code in app.js is organized around six major steps:
    1. Extract the environment information required to use the Streaming Analytics REST API.
    2. Check if the Streams instance is running and start the instance if necessary via the Streaming Analytics REST API.
    3. If the instance was already running, check if a Streams event-detection job is already running. If a job is running, cancel it.
    4. Deploy a Streams Application Bundle to the Streaming Analytics service by using the Streaming Analytics REST API. The bundle contains a Streams application that analyzes weather data and performs event detection.
    5. Process events detected by the Streams application, and display them on this web page.
    6. Cancel the job corresponding to the Streams application after 3,000 events are processed.

Skim the code to identify the where these steps are performed. You'll take a more detailed look at a couple of the steps in the remainder of this section.

  1. Examine the code for item 1(d) — deployment of the Streams Application Bundle to the Streaming Analytics service:
          ...
          // -----New form-----
          var form = new FormData();
    
          // -----File part-----
          form.append('file', fs.createReadStream('EventDetection.sab'), {
            contentType: 'application/octet-stream'
          });
    
          // -----JSON Part-----
    
          jsonObject = JSON.stringify({
            "jobName" :  "EventDetectionSample",
            "submissionParameters" :
            {
                "route" : app_uri,
            },
          });
          console.info('JSON object part: ' + jsonObject);
    
          var buffer = new Buffer(jsonObject);
          form.append('my_buffer', buffer, {
            contentType: 'application/json',
            // The line below is not an actual file.  The name with the .json
            // extension is needed for the data in the buffer to be recognized
            // as json.
            "filename": "jparams.json"
          });
    
          // -----POST Params-----
          var uri_string = sa_props.jobs_path + '?bundle_id=EventDetection.sab';
    
          // -----SUBMIT POST-----
          var jsonPostRes = {};
    
          form.submit({
            protocol: 'https:',
            host: sa_props.rest_host,
            path: uri_string,
            headers: {'Authorization' : authbuf}
          }, function(err, res) {
             ...

    The preceding code performs an HTTP POST to the Streaming Analytics REST API, using the form-data package to submit a multipart form:
    • One part of the form is a file called EventDetection.sab. A .sab file is a Streams Application Bundle, which is the product of the compilation of a Streams application in a Streams development environment. EventDetection.sab contains the Streams application that performs the event detection.
    • The other part is a JSON object containing other information for the deployment of the bundles, such as the name you want to call the job that will run in the Streaming Analytics instance, and submission-time parameters for this application. The Streams application has only one submission-time parameter. You pass in the route to your Node.js app so the Streams application can send events back to the Node.js app as they are detected.
  2. Now examine the item 1(e) code, which processes the events that are detected by the Streams application. This code appears near the start of the app.js file and not in the sequence of steps, because it's called asynchronously whenever an event is sent from the Streams application. The Streams application performs an HTTP POST to the Node.js app to send an event to the app. The following code processes the JSON payload of the event and takes the appropriate action:
    // POST handler for the events being sent back from the Streams application
    app.post('/', function(req, res){
        status_step[4] = "Processing Events";
    
        if (!cancelling) {
          console.info("In POST function");
          var jsonString = req.body.jsonString;
          console.info("POST message is: " + jsonString);
          var payload = JSON.parse(jsonString);
        
          if (payload.eventType == 'MaxMin Temp') {
            // Max or min temperature change
            maxmin = payload;
          }
          else {
            // Regular event
            eventCount++;
            console.info("Event total = " + eventCount);
    
            // Add event to the array used by the web user interface
            events.push(new Event(eventCount, payload));
         
            // Cancel the Streams job if we've reached the event target
            if (eventCount == eventTarget) {
              cancelling = true;
              console.info("EVENT TARGET REACHED...");
              console.info("STREAMS JOB WILL BE CANCELLED.");
              finalCancel(jobNumber.toString());
            }
          }
        }
        res.send();
    });

Step 6. Review the Streams application code

The Streams application used is a complete Streams application that requires no customization to run. The source code that you downloaded (or cloned or forked) contains the application's source code as well as its prebuilt .sab file. To understand the Streams application, examine its code:

  1. Open the EventDetection.spl file (located in the project's spl subdirectory). The source code for the application is written in SPL, a language oriented to data streams and operators that act upon them.
  2. Skim the code to locate the operator declarations and compare them to the flow graph that you saw in the Streaming Analytic console. You'll take a more detailed look at a couple of the operators in the remainder of this section.
  3. Examine the code for one of the operators that detects a complex event. The code snippet below shows an operator called MatchRegex, which is used to detect patterns on a series of data tuples in a stream. The code comments describe the nature of the M-shape pattern that the operator will detect:
      //
      // The first complex event is called "M-shape".  It triggers when the graph of the 
      // temperature for a weather station form's an M shape over a period of time.
      //
      // Detecting M shape patterns in weather data is not that useful, but recognizing an M shape in
      // financial trading is valuable and is referred to as a "double-top" stock pattern.
      //
      // See http://hirzels.com/martin/papers/debs12-cep.pdf for more information on this complex event
      // detection method, the double-top pattern and other patterns.
      //
      stream<WeatherSummary weatherValues, rstring event> TempMEvent = MatchRegex(WeatherSummary)
      {
        param
          pattern     : ". rise+ drop+ rise+ drop* deep";
          partitionBy : stationCode;    
          predicates  : {
    	rise = tempInF>First(tempInF)  && tempInF>=Last(tempInF),
    	drop = tempInF>=First(tempInF) && tempInF<Last(tempInF),
    	deep = tempInF<First(tempInF)	&& tempInF<Last(tempInF) };
        output
          TempMEvent : weatherValues=WeatherSummary, event="M-Shape Temp";
      }

    The declaration of the operator defines the pattern that you're trying to detect by using regular-expression syntax with a set of predicates that are also defined in the operator declaration. The operator looks for the M-shape of the temperature at a weather station based upon the set of values that have been reported by that weather station. This MatchRegex operator consumes the WeatherSummary stream defined earlier in the SPL code and produces a stream called TempMEvent. The operator partitions the weather station's readings into separate groupings, by the weather station's ID, and maintains the state necessary to detect the event for each weather station.

  4. Examine a sequence of two operators used to send events back to the Node.js app:
      //
      // Send events to the application user interface by converting them to json and HTTPPost-ing
      // to the Node.js app
      //
      stream <rstring jsonString> JSONOutput = com.ibm.streamsx.json::TupleToJSON(OutputEvents)
      {
      }
      
      () as HttpEvents = HTTPPost(JSONOutput) {
        param
          headerContentType : "application/json";
          url : ((rstring) getSubmissionTimeValue("route"));
      }

    The first operator in the preceding snippet converts a tuple in a stream into a JSON string. This operator consumes a stream called OutputEvents defined earlier in the SPL code and produces a stream called JSONOutput. The next operator, called HTTPPost, consumes the JSONOutput stream and sends the JSON string to the route for the Node.js app via an HTTP POST.

Step 7. Customize or extend the app

Now that you're familiar with the starter app, you can modify the application's source code to customize it or extend it in any of several interesting ways:

  • To make the app run longer, modify the Node.js code in app.js to change the event_target variable's value from 3000 to a higher number.
  • To define a new complex event for the Streams application to detect:
    1. Update the SPL code to add another MatchRegex operator to the flow to detect a pattern that you want to look for.
    2. Update the operators after your new MatchRegex operator in the SPL code so that your new event type gets sent back to the Node.js application.

To modify the app:

  1. Plan your modifications.
  2. Change the Node.js and/or SPL source code to reflect your desired customizations.
  3. If you have modified the SPL code, you must recompile it in a Streams development environment and replace the .sab file that you downloaded with this updated version.
  4. Deploy (push) the modified application to Bluemix.

Conclusion

Complex event detection against a real-time data stream is simple to perform in Bluemix via the Streaming Analytics service. The application that you worked through in this tutorial will get you started. You can change to the data streams you want to analyze, define the events you want to detect, and act on those events to accomplish your goals.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Cloud computing
ArticleID=1024443
ArticleTitle=Detect complex events in a real-time data stream
publish-date=12212015