Contents


Predictive Cloud Computing for professional golf and tennis, Part 3

Big data in motion

Comments

Content series:

This content is part # of 8 in the series: Predictive Cloud Computing for professional golf and tennis, Part 3

Stay tuned for additional content in this series.

This content is part of the series:Predictive Cloud Computing for professional golf and tennis, Part 3

Stay tuned for additional content in this series.

During professional golf and tennis tournaments, a wide variety of data streams through cloud computing environments to provide fans with an immersive digital experience. The Predictive Cloud Computing system applies analytics to this large-scale data—or “Big Data in Motion”—to extract predictors for future game play states and to show player popularity on the social leader board. The technologies for stream computing involved include:

  • Natural language processing
  • IBM InfoSphere Streams
  • Unstructured Information Management Architecture (UIMA) Processing Engine ARchive (PEAR)
  • Twitter
  • Bluemix
  • IBM Content Analytics

Stream computing produces aggregate and real-time statistics for predictive cloud resource management and live social leadership scoreboards during each sporting event.

Stream computing architecture and data

Analytic processing on sporting and social data within PCC must occur in real time to provide live player popularity and trending patterns, and to enable the provisioning of cloud resources ahead of the current time horizon. To achieve high-volume data analytics without latency, information is pushed through streams for particular operators to complete an atomic function that is placed back onto the stream for further processing or output. IBM InfoSphere Streams provides its own streams-processing language, SPL, as well as supporting native C/C++ and Java bindings for custom operations. Predictive Cloud Computing uses streams extensively, having a number of SPL and Java operators to analyze the real-time data.

InfoSphere Streams execution takes the form of a directed graph. At the beginning, there are one or more "source" operators, which either create or receive data from an outside data source, such as local files, a remote database, a socket, HTTP protocols, or a custom source. Once consumed, the data flows through operators, which act as functors that manipulate the data as it flows through. Operators can reject data, aggregate it for a window of time, mix in more data, or transform it to a different format.

An example of this workflow in the Predictive Cloud project is our processing of Twitter data, which is depicted in Figure 1. Our source operator connects to the Twitter GNIP API, receiving JSON text from live tweets. The next operator parses this text into JSON and extracts only the relevant fields of data from the large amount of metadata associated with each tweet. From here, the data follows two paths. On one path, the tweets are windowed to aggregate them by the time they arrive, storing only the total count of tweets and time stamp of the window. After being aggregated for 60 seconds, the count and time stamp are posted to Graphite, through a UDP message sink. In the second path, the tweet data is sent to an HTTP post sink, which sends an HTTP post request to a RESTful API, to store the tweet information in a database.

While InfoSphere Streams provides a number of built-in operators, sinks, and sources for data manipulation, it cannot cover every use-case. For example, in the Twitter workflow above, we lacked an operator to make HTTP post requests, so we wrote a Java operator that used the producer/consumer model to process incoming tweet data and outgoing requests. Incoming Twitter data was stored in a queue to ensure that slow HTTP requests did not block the flow of Streams executions. In a separate thread of execution, HTTP requests were made to send the data to our RESTful service. InfoSphere Streams provides a Java API toolkit, which custom operators extend. Custom operators are first-class citizens in InfoSphere Streams, behaving just like their native counterparts. They can be configured and have access to any parameters passed into them, and are sandboxed in their own JVM.

Figure 1 depicts the five different stream computing jobs that connect to a data source and eventually provide output to a RESTful web service, to a disk, or to another job. In general, each processing element accepts a tuple that contains an object of results or a punctuation mark that denotes a type of signal, such as a window. Each job can contain one or more processing elements that are related through a directed graph as programmed within the Streams Processing Language.

Figure 1. The overall stream computing architecture for PCC
Chart showing stream computing architecture
Chart showing stream computing architecture

Jobs 0, 1, 2, and 3 support the social leadership board shown in Figure 1. Job 0 accepts input parameters that describe the HTTP service of which to pull tweets. For sporting events, secure HTTP that connects to Twitter's Powertrack with a specified user and password is placed into the header of the stream source that is defined through optional runtime parameters. If the parameters are not specified, default values are specified, as depicted by Listing 1. The output of the operator is a stream named TwitterStream with tweets in the form of rstrings. The stream is exported with the GNIPStream identity so that it is identifiable by other consumer jobs such as 1, 2, and 3.

Listing 1. Job 0's Streams Processing Language Twitter connection
namespace application ;

use httputils.com.ibm.ssb.inet.http::* ;

//Reads the GNIP Twitter Stream and exports the raw text to other
//Stream instances on the same server instance
composite TwitterStreamExporter
{
    param
        expression<rstring> $protocol : getSubmissionTimeValue("protocol", "https") ;
        expression<rstring> $host : getSubmissionTimeValue("host", "<host>") ;
        expression<rstring> $baseurl : getSubmissionTimeValue("baseurl",”<URL ending>") ;
        expression<rstring> $userid : getSubmissionTimeValue("userid", "<userid>") ;
        expression<rstring> $password : getSubmissionTimeValue("password", "<password>") ;
    type

        TwStreamT = rstring gnip, rstring body, rstring actor, rstring id,
            rstring object, rstring postedTime ;
    graph

        stream<rstring tweet> TwitterStream as TwSHTTPReaderOp = HTTPGetStreamSource()
        {
            param
                protocol : $protocol;
                host : $host;
                baseurl : $baseurl;
                userid : $userid;
                password : $password;
        }

        () as ExportOperator = Export(TwitterStream)
        {
            param
                streamId : "GNIPStream";
        }
}

Job 1, as shown in Listing 2, imports the GNIPStream with the provided Import operator and names the stream TwitterStream. Next, a custom operator written in Java code converts a JSON string to a tuple. The operator JSONToTuple is defined in an operator model XML file shown in Listing 3. The location of required libraries is defined within the manageLibrary section. Any parameters that input into the operator are described, as well as metrics. Metrics provide static states such as counters and gauges to monitor the activity of a processing engine. The JSONToTuple operator model contains only parameters. Finally, input and output ports are specified. In particular, the behavior regarding punctuation is important as the instructions determine whether the new punctuations will be produced, ignored, or passed through to the next processing engine.

The TwitterOperator accepts the TwitterTupleStream and aggregates the total number of tweets per player for each minute. The aggregated results are sent to a RESTful service for storage in DB2. The Graphite parameters are used to establish a UDP connection to a time series database. The total number of tweets are aggregated together by an aggregator operator over a 1-minute window. A custom operator written in SPL specified by functor sets the target space for Graphite. The resulting stream, GraphiteLogProcessingRate, is dumped to a UDPSink.

Listing 2. Job 1's Streams Processing Language Twitter aggregation
namespace application ;

use JSONHelpers.com.ibm.ssb.parsers.json::JSONToTuple ;
use TwitterOperator.com.ibm.ei.twitter::TwitterOperator ;

composite TwitterStreamReader
{
    param
        expression<rstring> $playerSummaryURL : getSubmissionTimeValue("playerSummaryURL");
        expression<rstring> $tweetSummaryURL : getSubmissionTimeValue("tweetSummaryURL") ;
        expression<rstring> $siteTag : getSubmissionTimeValue("siteTag") ;
        expression<rstring> $siteTagPlayerDelimiter : getSubmissionTimeValue("siteTagPlayerDelimiter") ;
        expression<rstring> $graphiteHost : getSubmissionTimeValue("graphiteHost", "<host>");
        expression<int32>   $graphitePort : (int32)getSubmissionTimeValue("graphitePort", "<port>");
        expression<rstring> $graphiteMetricName : getSubmissionTimeValue("graphiteMetricName");
        expression<rstring> $siteName : getSubmissionTimeValue("siteName") ;
        expression<rstring> $siteYear : getSubmissionTimeValue("siteYear") ;
        expression<rstring> $plexId : getSubmissionTimeValue("plexId") ;
    type

        TwStreamT = rstring gnip, rstring body, rstring actor, rstring id,
            rstring object, rstring postedTime, rstring retweetCount ; // int64 id; 

    graph

        stream<rstring tweet> TwitterStream = Import()
        {
            param
                applicationName : "application::TwitterStreamExporter";
                streamId : "GNIPStream";
        }

        //convert json string to tuple
        stream<TwStreamT> TwitterTupleStream = JSONToTuple(TwitterStream)
        {
            param
                continueOnError : true ;
        }

        //Process json tuples
        () as TwitterSink = TwitterOperator(TwitterTupleStream)
        {
            param
                tweetSummaryURL : $tweetSummaryURL;
                playerSummaryURL : $playerSummaryURL;
                siteTag : $siteTag;
                siteTagPlayerDelimiter : $siteTagPlayerDelimiter;
                siteName : $siteName;
                siteYear : $siteYear;
                plexId : $plexId;
                graphiteHost : $graphiteHost;
                graphitePort : $graphitePort;
        }
        
        stream<int32 tweetsProcessed> TweetProcessingRate = Aggregate(TwitterStream)
        {
            window
                TwitterStream : tumbling, time(1.0);
            output
                TweetProcessingRate : tweetsProcessed = Count();
            config 
                restartable : true;
        }
        
        stream<rstring data> GraphiteLogProcessingRate = Functor(TweetProcessingRate)
        {
            output
                GraphiteLogProcessingRate: data = $graphiteMetricName + ":" + ((rstring)tweetsProcessed) + "|c";
        }
        
        () as UDPSink1 = UDPSink(GraphiteLogProcessingRate)
        {
            param
                address : $graphiteHost;
                port : (uint32)$graphitePort;
        }
}

The JSONToTuple operator is defined in an operator model XML file, as shown in Listing 3.

Listing 3. The JSONToTuple operator model
<operatorModel xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator" xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common" xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
  <javaOperatorModel>
    <context>
      <description></description>
      <executionSettings>
        <className>com.ibm.streams.JSONToTuple</className>
        <vmArgs/>
      </executionSettings>
      <libraryDependencies>
        <library>
          <cmn:description>Java operator class library</cmn:description>
          <cmn:managedLibrary>
            <cmn:libPath>../../lib/JSON4J.jar</cmn:libPath>
            <cmn:libPath>../../impl/java/bin</cmn:libPath>
          </cmn:managedLibrary>
        </library>
      </libraryDependencies>
    </context>
    <parameters>
      <parameter>
        <name>dataParamName</name>
        <description docHref="" sampleUri="">Name of the parameter carrying the JSON string. </description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>continueOnError</name>
        <description>Default is false. If true, an empty tuple is output if a parsing error occurs.</description>
        <optional>true</optional>
        <type>boolean</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>addJSONString</name>
        <description>Adds the source JSON string to the specified attribute. Note that if that attribute was populated by parsing the json, then it will be overwritten.</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>copy</name>
        <description>Names of fields that need to be copied over to the output port. Note that these fields will NOT be populated by the JSON parsing.</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>-1</cardinality>
      </parameter>
      <parameter>
        <name>target</name>
        <description docHref="" sampleUri="">The operator will attempt to parse the json based on an attribute in the main tuple on output port 0. This enables auto-copying of all other common fields from input to output.</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
      <parameter>
        <name>protectedPrefix</name>
        <description>Prefix added to attributes with the same name as protected SPL keywords. Default &quot;__pr_&quot;</description>
        <optional>true</optional>
        <type>rstring</type>
        <cardinality>1</cardinality>
      </parameter>
    </parameters>
    <inputPorts>
      <inputPortOpenSet>
        <windowingMode>NonWindowed</windowingMode>
        <windowPunctuationInputMode>Oblivious</windowPunctuationInputMode>
      </inputPortOpenSet>
    </inputPorts>
    <outputPorts>
      <outputPortOpenSet>
        <windowPunctuationOutputMode>Generating</windowPunctuationOutputMode>
        <windowPunctuationInputPort>-1</windowPunctuationInputPort>
      </outputPortOpenSet>
    </outputPorts>
  </javaOperatorModel>
</operatorModel>

Job 2 is optional and is used to write all tweets to a disk. The FileSink operator is a standard stream computing API operator that determines where on a disk to write tweets. Alternatively, an HDFSFileSink can be invoked to place tweets into a Hadoop Distributed File Sink if the Big Data Toolkit is installed and linked during the SPL compilation stage. The namespace of the HDFSFileSink is com.ibm.streams.bigdata.hdfs. The HDFS command-line tools also must be installed on the machine in which the operator will run. Finally, Job 3 imports the TwitterGNIPStream, measures the social sentiment per player, and sends aggregated results to a JSON file that is either stored on a disk or Object Storage.

Social sentiment leader board and SPL

Job 3, shown in Figure 2, performs the social sentiment scoring based on players within a sporting tournament correlated with tweets. The job contains 20 processing elements that use a combination of custom Java operators, custom SPL operators, and Streams API-provided operators. The TwitterStream exported by Job 0 is imported by Job 1 and converted from JSON to tuples. Natural language processing filters are applied to the tuple text before streaming system protections are invoked. To prevent wasted cycles and thus, reduce risk to real-time processing, an expired parameter prevents old tweets from continuing downstream. If a tweet has passed all filters and expiration checks, parallel processing engines install and invoke UIMA PEAR files for social sentiment analysis. The analysis results are parsed into tuples and joined to player identifiers for a specific tournament. The positive-to-negative percentages for each player are accumulated over a specific window of time such as 5 minutes. Finally, the tuples are converted to JSON and stored with Object Storage.

Figure 2. The processing element flow for the social sentiment stream's job
Figure showing processing element     flow
Figure showing processing element flow

The TwitterStream is converted to a TwitterTupleStream by a custom Java operator called JSONToTuple. The JSONToTuple operator is included within the composite SPL code with the statement use JSONHelpers.com.ibm.ssb.parsers.json::JSONToTuple, which references the operator model that then includes the operator JAR files. A stream called feed is created from the TwitterTupleStream and input into another composite operator. Listing 4 depicts the input streams parameter, feed, into a separate composite SPL operator.

Listing 4. The input of a stream called feed into the FeedProcessing composite SPL flow
//GNNIPRacketStream.spl
stream<ustring text, timestamp created_at> feed = GNIPStreamParser(TwitterTupleStream)        
        {
        }

        // --- Processing the feed        
        () as proc = FeedProcessing(feed){        
            param        
                inputTextField: "text";        
        }        

//FeedProcessing.spl
namespace com.ibm.incubator.rs ;

use JSONHelpers.com.ibm.ssb.parsers.json::TupleToJSON;
use ObjectStorage.com.ibm.store::ObjectStoreRetrieveResource;

composite FeedProcessing(input feed )
{
….

The input stream feed is input into a custom Java operator called TransitiveWindower that adds punctuations to the output stream, transitiveWindower. The punctuations are defined by a runtime or default parameter of 5 minutes so that window demarcations are placed on the output stream. Each punctuation is later used for sentiment aggregation. A custom Java operator that filters tweets is called NonLatinLettersRateCalculator and is depicted in Listing 5. The resulting stream is input into an SPL custom operator that produces a stream with pass-through punctuations and tuples decorated by the current system time.

Listing 5. The NonLatinLettersRateCalculator custom Java operator code
package com.ibm.incubator.rs.operators;

import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.StreamingInput;
import com.ibm.streams.operator.StreamingOutput;
import com.ibm.streams.operator.Tuple;

public class NonLatinLettersRateCalculator extends BaseOperator {

    public static final String TA_RATE = "rate";

    @Override
    public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {
        StreamingOutput<OutputTuple> out = getOutput(0);
        OutputTuple t = out.newTuple();

        String message = tuple.getString(com.ibm.incubator.rs.operators.GNIPStreamImporter.TA_TEXT);
        
        int letters = 0;
        int nonLatinLetters = 0;
        for(int i = 0; i < message.length(); i++) {
            char c = message.charAt(i);
            if(Character.isLetter(c)) {
                letters++;
                if(c > 128) {
                    nonLatinLetters++;
                }
            }
        }
        float rate = (float) nonLatinLetters / letters;
        
        t.assign(tuple);
        t.setFloat(TA_RATE, rate );
        out.submit(t);
    }
}

The Latin filtered stream is input into an operator that examines the time the tweet was on the network before social sentiment analysis. If the tweet has surpassed a parameterized threshold, it is filtered from the current stream. The output stream called feed_non_delayed is split into three streams and input in a custom USOpenSentimentAnalyser, as shown in Listing 6.

Listing 6. A non-delayed Twitter stream is split into three streams for UIMA social sentiment analysis
// --- Splitting feed stream
        (stream<I> feed1 ; stream<I> feed2 ; stream<I> feed3) = Split(feed_non_delayed
            as I)
        {
            logic
                state : mutable int64 i = 0 ;
                onTuple I :
                {
                    i ++ ;
                }

            param
                index : [ i % 3l ] ;
        }

        // --- Sentiment analysis
        stream<AnalysisResult result> analysed1 = USOpenSentimentAnalyser(feed1 as I)
        {
            param
                pearFile : $packagedPearPath; 
                localWriteLocation : $pearDropPath ;
                installDir : "/tmp/pear_install" ;
                inputTextField : $inputTextField ;
                parallelization : 1 ;
                expiredSeconds : $expiredSeconds ;
                storeObjectName : $objectStorePearFile ;
                storeConfigFile : $objectStoreConfigPath ;
                storeContainer : $container ;
                storeVersion : $storeVersion ;
                refreshRateSeconds : $refreshRateSeconds ;
        }

        stream<AnalysisResult result> analysed2 = USOpenSentimentAnalyser(feed2 as I)
        {
            param
                pearFile : $packagedPearPath; 
                localWriteLocation : $pearDropPath ;
                installDir : "/tmp/pear_install" ;
                inputTextField : $inputTextField ;
                parallelization : 1 ;
                expiredSeconds : $expiredSeconds ;
                storeObjectName : $objectStorePearFile ;
                storeConfigFile : $objectStoreConfigPath ;
                storeContainer : $container ;
                storeVersion : $storeVersion ;
                refreshRateSeconds : $refreshRateSeconds ;
        }

        stream<AnalysisResult result> analysed3 = USOpenSentimentAnalyser(feed3 as I)
        {
            param
                pearFile : $packagedPearPath; 
                localWriteLocation : $pearDropPath ;
                installDir : "/tmp/pear_install" ;
                inputTextField : $inputTextField ;
                parallelization : 1 ;
                expiredSeconds : $expiredSeconds ;
                storeObjectName : $objectStorePearFile ;
                storeConfigFile : $objectStoreConfigPath ;
                storeContainer : $container ;
                storeVersion : $storeVersion ;
                refreshRateSeconds : $refreshRateSeconds ;
        }

The USOpenSentimentAnalyzer installs a UIMA PEAR that is included within the Streams build or Streams application bundles (SAB) file by Listing 7, Listing 8, and Listing 9. Given a refresh rate, a remote PEAR is pulled from Object Storage and compared with the package PEAR MD5 hash. If the hashes differ, the remote PEAR overrides the installed PEAR with a refresh. In this way, PEAR files can be updated remotely and in real time by the operator. PEAR files are locked when being modified and each Java process uses the processing element ID to create unique directory structures so that each process is independent.

Listing 7. A UIMA PEAR file is refreshed and installed to support UIMA social sentiment analysis
private void refreshAnalyzer(String pearFile, String installFolder, int parall, boolean toLock) throws Exception {
        // Initializing UIMA analyzer
        if (analyzer != null) {
            analyzer.shutdown();
        }
        @SuppressWarnings("resource")
        FileChannel fileChannel = new RandomAccessFile(new File(pearFile), "rw").getChannel();
        @SuppressWarnings("resource")
        FileLock lock = toLock ? getFileLockWithAttempts(fileChannel, MAXIMUM_FILE_LOCK_ATTEMPTS) : null;

        analyzer = new USOpenSentimentsAnalysis();
        try {
            try {
                analyzer.init(new File(pearFile), FileUtils.getDirectory(installFolder), parall);
            } catch (RuntimeException e) {
                error(e, "Cannot initialize UIMA analyzer.");
                throw new Exception(e);
            }
        } finally {
            if (toLock) {
                lock.release();
                fileChannel.close();
            }
        }
    }
Listing 8. The UIMA Social Sentiment Analyzer initializes resources before processing of tuples
@Override
    public synchronized void initialize(OperatorContext context) throws Exception {
        super.initialize(context);

        processingEngineID = new Integer(context.getPE().getPEId().intValue()).toString();

        // Reading parameters
        String pearFilePath = getReqParam(P_PEARFILE);
        String installFolder = getReqParam(P_INSTALLDIR);
        String writeLocation = createUniqueResourceDrop(getReqParam(P_LOCAL_WRITE_LOCATION), processingEngineID);
        int parall = getParamAsInt(P_PARALLELIZATION, true, DEF_PARALLELIZATION);

        // Write the packaged file to local disk
        File writeLocationFile = new File(writeLocation);
        String writeLocationPath = writeLocationFile.getPath().substring(0,
                writeLocationFile.getPath().lastIndexOf(File.separator));
        if (!writeLocationFile.exists()) {
            if (new File(writeLocationPath).mkdirs()) {
                RESULT_LOG.trace("Created the following directories: " + writeLocationPath);
            }
            writeLocationFile.createNewFile();
        }
        try (@SuppressWarnings("resource")
        FileChannel fileChannel = new RandomAccessFile(writeLocationFile, "rw").getChannel();
                FileLock lock = getFileLockWithAttempts(fileChannel, MAXIMUM_FILE_LOCK_ATTEMPTS);) {
            try (final InputStream fis = Files.asByteSource(new File(pearFilePath)).openStream();) {
                ByteBuffer writeBuf = ByteBuffer.wrap(IOUtils.toByteArray(fis));
                fileChannel.truncate(0);
                while (writeBuf.hasRemaining()) {
                    fileChannel.write(writeBuf);
                }
            }
        }

        // Initializing UIMA analyzer
        refreshAnalyzer(writeLocation, installFolder, parall, true);

        inputTextField = getReqParam(P_INPUT_TEXT_FIELD);
        Integer seconds = getParamAsInt("expiredSeconds", true, 180);
        MDC.put("id", getOperatorContext().getName());
        getOperatorContext().getMetrics().getCustomMetric(M_EXPIRED_TIME).setValue(seconds);
        int refreshRateSeconds = getParamAsInt(P_REFRESH_RATE_SECONDS, true, DEFAULT_REFRESH_RATE_SECONDS);
        getOperatorContext().getMetrics().getCustomMetric(M_PEAR_REFRESH_TIME).setValue(refreshRateSeconds);
        delta = new Timestamp(seconds, 0);
    }
Listing 9. The logic of the UIMA Social Sentiment Analyzer within Java code
/** Input text processing. */
    @Override
    public synchronized void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {
        long time = System.currentTimeMillis();
        Timestamp createdAt = tuple.getTimestamp(GNIPStreamImporter.TA_CREATED_AT);
        String text = tuple.getString(inputTextField);
        int refreshRateSeconds = getParamAsInt(P_REFRESH_RATE_SECONDS, true, DEFAULT_REFRESH_RATE_SECONDS);
        String localWriteLocation = createUniqueResourceDrop(getReqParam(P_LOCAL_WRITE_LOCATION), processingEngineID);
        String installFolder = getReqParam(P_INSTALLDIR);
        int parall = getParamAsInt(P_PARALLELIZATION, true, DEF_PARALLELIZATION);
        if (createdAt.before(lastPunctuation.subtract(delta))) {
            incMetricValue(M_SKIPPED);

            long skippedCount = getOperatorContext().getMetrics().getCustomMetric(M_SKIPPED).getValue();
            long curMaxDifference = getOperatorContext().getMetrics().getCustomMetric(M_MAX_TIME_DIFFERENCE).getValue();
            long curDifference = lastPunctuation.subtract(createdAt).getSeconds();

            difference += curDifference;
            if (curDifference > curMaxDifference) {
                getOperatorContext().getMetrics().getCustomMetric(M_MAX_TIME_DIFFERENCE).setValue(curDifference);
            }

            getOperatorContext().getMetrics().getCustomMetric(M_LAST_TIME_DIFFERENCE).setValue(curDifference);

            getOperatorContext().getMetrics().getCustomMetric(M_AVG_TIME_DIFFERENCE)
                    .setValue(difference / skippedCount);
            MDC.put("id", getOperatorContext().getName());
            RESULT_LOG.info(
                    "Skipping: \"{}\" \n TWITTER CREATION TIME: {} ms, WINDOW TIME: {} ms \n EXPIRED TIME: {} sec WITH DELTA: {}",
                    new Object[] { text, createdAt.getTime(), lastPunctuation.getTime(), curDifference, delta });
            return;
        }

        if ((lastPearRefresh.add(new Timestamp(refreshRateSeconds, 0)).before(Timestamp.currentTime()))) {
            RESULT_LOG.trace("Attempting to refresh the UIMA pear file.");
            if (refreshResourceFile(processingEngineID)) {
                refreshAnalyzer(localWriteLocation, installFolder, parall, true);
            }
            lastPearRefresh = Timestamp.currentTime();
        }
        // Performing analysis
        AnalysisResult result = new AnalysisResult();
        try {
            analyzer.analyze(text, result);
        } catch (Exception e) {
            error("Error while analysis: %s", e.toString());
            RESULT_LOG.info("Error while analysis: %s", e.toString());
            return;
        }
        MDC.put("id", getOperatorContext().getName());
        incMetricValue(M_ANALYZED);
        // RESULT_LOG.info("--------------------");// 20 '-'

        if (!result.isEmpty()) {
            incMetricValue(M_FOUND);
        }
        // Producing output
        int numOutputs = getOperatorContext().getNumberOfStreamingOutputs();
        for (int i = 0; i < numOutputs; i++) {
            StreamingOutput<OutputTuple> output = getOutput(i);
            OutputTuple outTuple = output.newTuple();
            if (!result.isEmpty()) {
                outTuple.setMap(TA_RESULT, result.getSentiments());
                output.submit(outTuple);
            }
        }
        time = System.currentTimeMillis() - time;
        getOperatorContext().getMetrics().getCustomMetric(M_ANALYSIS_TIME).incrementValue(time);
        long anTime = getOperatorContext().getMetrics().getCustomMetric(M_ANALYSIS_TIME).getValue();
        long anNumb = getOperatorContext().getMetrics().getCustomMetric(M_ANALYZED).getValue();
        getOperatorContext().getMetrics().getCustomMetric(M_AVG_TIME)
                .setValue((long) Math.floor(anTime / (double) anNumb + 0.5));
    }

Each of the analysis streams is unioned together into an analysisResult stream. Sentiments are aggregated and stored within temporary storage until a punctuation is encountered. When a punctuation tuple is discovered, an AggregatedSentimentsCalc is output into the PlayerCSVRetrieval Java custom operator. The PlayerCSVRetrieval compares the MD5 hash between the players.csv within the SAB file with that within Object Storage. An example snippet of the player.csv file can be viewed in Listing 10. If the two hashes differ, the Object Storage players.csv takes precedence. Each of the sentiment measures is correlated with a player identifier into a Streams Map.

Listing 10. Example lines within the player.csv file that contain the Association of Tennis Players identifiers and names
atpa479,nicolas almagro
atpa596,pablo andujar
atpa678,kevin anderson
atpb678,michael berrer

A custom SPL operator will next accept the aggregatedSentiments, PlayerCSV map, and tuples that will signal if a player map should be cleaned, as shown in Listing 11.

Listing 11. SPL example of creating one output stream from three input streams
    stream<rstring ID,ustring playerName, int32 totMentWind, float64 posSentWindPercent,
            int32 totMentTour, float64 posSentTourPercent> playerIDEnrich = Custom(CleanPlayerMap;PlayerCSV;aggregatedSentiments) {

        logic state : {mutable map<rstring,rstring> PlayerMap; }

        onTuple CleanPlayerMap : {
            clearM(PlayerMap);
        }

        onTuple PlayerCSV : {
            insertM(PlayerMap, PlayerCSV.name, PlayerCSV.id);
        }
        
        
        onTuple aggregatedSentiments : {
            //lookup and submit
            rstring name = (rstring)aggregatedSentiments.playerName;
            printStringLn("Name: " +name) ;
            if (name in PlayerMap){
                 mutable rstring id = PlayerMap[name];            
                submit({ID=id,playerName=aggregatedSentiments.playerName,totMentWind=aggregatedSentiments.totMentWind,posSentWindPercent=aggregatedSentiments.posSentWindPercent,totMentTour=aggregatedSentiments.totMentTour,posSentTourPercent=aggregatedSentiments.posSentTourPercent},playerIDEnrich);
            }
            else{
                submit({ID="",playerName=aggregatedSentiments.playerName,totMentWind=aggregatedSentiments.totMentWind,posSentWindPercent=aggregatedSentiments.posSentWindPercent,totMentTour=aggregatedSentiments.totMentTour,posSentTourPercent=aggregatedSentiments.posSentTourPercent},playerIDEnrich);
                //appTrc(Sys.info,"No match in map for player name: " + name);
                appTrc(Trace.info,"No match in map for player name: " + name);
            }
        }
         onPunct aggregatedSentiments : {
                 submit(Sys.WindowMarker,playerIDEnrich);
             }
        
        }

The playerIDEnrich stream is converted to JSON tuples and submitted onto a JSON stream. The JSONStream is input into a composite, as shown in Listing 12. A custom SPL operator next accepts the aggregatedSentiments, PlayerCSV map, and tuples that will signal if a player map should be cleaned, as shown in Listing 12. The final resulting JSON data is written out to Object Storage by a custom Java operator called ObjectStorePersist, which uses Swift APIs.

Listing 12. The JSONStream is input into a composite SPL flow that stores the text into Object Storage
stream<rstring jsonData> JSONStream = TupleToJSON(JSONConverter as I){}
                                
        () as proc = ObjectStorePersistence(JSONStream as J) {
        param 
            //the tuple field that has json data
            storeText : "jsonData";
        }
namespace com.ibm.incubator.rs ;

use ObjectStorage.com.ibm.store::ObjectStorePersist ;

composite ObjectStorePersistence (input feed){

param
    // Filed in tuples from input feed, containing json text to process
    expression<rstring> $storeText ;
    expression<rstring> $objectStoreConfigPath : getThisToolkitDir() + "/ObjectStorage/properties/"+getSubmissionTimeValue("storeConfigFile","objectStore.dallas.properties") ;
    expression<rstring> $container : getSubmissionTimeValue("storeContainer","wwsm") ;
    expression<rstring> $objectName : getSubmissionTimeValue("storeObjectName","SocialSentiment.json") ;
    expression<rstring> $storeVersion : getSubmissionTimeValue("storeVersion","v1") ;
graph

 () as TupleStore = ObjectStorePersist(feed) {
    param 
        storeConfigFile : $objectStoreConfigPath;
        storeContainer : $container;
        storeObjectName : $objectName;
        jsonData: $storeText;
        storeVersion: $storeVersion;
}
}

Listing 13 shows the output social sentiment in JSON format.

Listing 13. A sample of the resulting social sentiment JSON file
{
  "players" : [ {
    "total" : 33841,
    "recentPositive" : 0.0,
    "name" : "yi fan xu",
    "id" : "wta312280",
    "totalPositive" : 80.0,
    "recent" : 10
  }, {
    "total" : 3790,
    "recentPositive" : 0.0,
    "name" : "jeremy chardy",
    "id" : "atpca12",
    "totalPositive" : 97.0,
    "recent" : 0
  }, {
    "total" : 85,
    "recentPositive" : 0.0,
    "name" : "malek jaziri",
    "id" : "atpj267",
    "totalPositive" : 100.0,
    "recent" : 0
  }, {
    "total" : 93,
    "recentPositive" : 0.0,
    "name" : "magda linette",
    "id" : "wta315130",
    "totalPositive" : 100.0,
    "recent" : 0
  }, {
    "total" : 364,
    "recentPositive" : 0.0,
    "name" : "ekaterina makarova",
    "id" : "wta311604",
    "totalPositive" : 88.0,
    "recent" : 0
}],
  "lastUpdate" : 1447272246065
}

One consumer of the output is the social leadership board shown in Figure 3.

Figure 3. A consumer of the social sentiment JSON file produces a social leader board
Figure showing social leader board
Figure showing social leader board

IBM Content Analytics

A broad base of dictionaries is required to perform social sentiment analysis for tweets about specific players. A UIMA pipeline configuration is created within IBM Content Analytics that consists of a document language selection, lexical analysis, parsing rules, and any clean-up. Within the sentiment measures for sports, we selected English as the base. The lexical analysis sage defines break rules for textual tokenization that are tagged with a part of speech. Custom dictionaries that are domain-specific are defined as lexical dictionaries.

Figure 4. IBM Content Analytics UIMA pipeline configuration for social sentiment analysis
Sceen capture showing UIMA pipeline     configuration
Sceen capture showing UIMA pipeline configuration

For each sporting event, we created two dictionaries that are associated with the English language. Players are imported into a local database called en-TennisPlayers. Several different surface forms of the name are attached to a normal form and associated with a part of speech, in this case, a noun. As the player dictionary is compiled, a UIMA-type system description is generated that specifies the player name annotation details. Each of the tokens that match a surface form of a sporting player's name is annotated with DictTennisPlayers. Most of the second dictionary consists of adjective definitions, which label tokens as positive or negative. For example, the word "OK" is classified as a positive adjective with an additional inflection of O.K. An additional feature called triggerType is added to the tokens. A token can have multiple features. Two additional built-in dictionaries are included to enhance the POS and segmentation of words into tokens.

Listing 14. A UIMA-type system for a tennis player dictionary
<?xml version="1.0" encoding="utf-8"?>
    <typeSystemDescription xmlns="http://uima.apache.org/resourceSpecifier">
  <name>en-XX-TennisPlayers-ts</name>
  <description>en-XX-TennisPlayers</description>
  <vendor></vendor>
  <version></version>
  <types>
    <typeDescription>
      <name>com.ibm.usopen.DictTennisPlayers</name>
      <description>en-XX-TennisPlayers</description>
      <supertypeName>uima.tt.DictionaryEntryAnnotation</supertypeName>
    </typeDescription>
    </types>
</typeSystemDescription>

The stream of features and annotations are inputs into the parsing rules stage where two rule-based semantic interpretations are defined.

  • An opinion semantic analysis provides several rules to produce a NegativeIndicator, a PositiveIndicator, or a NeutralIndicator. An atomic positive rule annotator includes a series of tokens that are a DictTrigger with a triggerType of positive or a DictTrigger with a negative triggerType: a value of n't, followed by zero to two tokens and concluded with a triggerType of negative.
  • An aggregrate rule draws relationships between NegativeIndicators and PositiveIndicators. For example, two PositiveIndicators followed by one NegativeIndicator is overall positive. However, if a single PositiveIndicator follows a single NegativeIndicator, the overall response is neutral.

Next, the accumulation of annotations over sentences or tweets is passed to the final sentiment annotator called TennisPlayerSentiment. Four rules look for varying orders of previous annotations. The rules produce TennisPlayerPositiveSentiment or TennisPlayerNegativeSentiment.

  • DictTennisPlayer[player token] followed by zero to seven other tokens followed by PositiveIndicator[lemma word] results in a TennisPlayerPositiveSentiment.
  • PositiveIndicator[lemma word] followed by zero to three tokens followed by DictTennisPlayer[player token] results in a TennisPlayerPositiveSentiment.
  • DictTennisPlayer[player token] followed by zero to seven other tokens followed by NegativeIndicator[lemma word] results in a TennisPlayerNegativeSentiment.
  • NegativeIndicator[lemma word] followed by zero to three tokens followed by DictTennisPlayer[player token] results in a TennisPlayerNegativeSentiment.

The UIMA pipeline, along with all of the lexical dictionaries and parsing rules, is exported into a UIMA PEAR file. The PEAR file is installed onto the IBM InfoSphere Streams machine through the UIMA Java API. Tuples passed through Streams are extracted for a custom UIMA operator where tweet text is then wrapped into a UIMA Common Analysis Structure for input into the installed UIMA PEAR file. The output of the annotations, TennisPlayerNegativeSentiment or TennisPlayerPositiveSentiment, is extracted from the UIMA Common Analysis Structure and placed back into a Streams tuple for downstream processing.

Server access log aggregation

The real-time logs stream provides the Predictive Cloud with hit rates for the sites we are analyzing, necessary to adjust forecasts for unexpected trends. A Python script monitors all IBM HTTP Server access logs and sends messages to RabbitMQ. On the consumer side, Figure 5 depicts the Log Aggregator job that pulls messages from RabbitMQ. Listing 15 shows a customer RabbitSource operator that creates a stream RabbitStream with rstrings. Before any tuples can be consumed or produced by the RabbitMQ custom operator, the method allPortsReady() is called, which uses the RabbitMQ API to connect to the supplied message queues. Keystores and trusts must be set up previously to support secure message transmission between RabbitMQ and Job 4. The RabbitStream is available to all processing engines defined within SPL.

One consumer of the RabbitStream aggregates logs lines every minute and provides a count of how many lines were processed as well as the time. Another custom Java operator converts the time tuple element that is output into a stream, LogLineStream, to a tuple. The output JSON content that contains the time and the total number of server accesses are sent to a RESTful service for storage in DB2. In parallel, the RabbitStream is input into a custom SPL functor that parses the time stamp of the messages and then aggregates the tuples together into a GraphiteLogLine stream. A UDPSink sends the total number of lines that were processed to a time series database, Graphite, that accepts UDP requests.

Figure 5. The overall processing element flow of the Log Aggregator stream computing job
Chart showing overall processing element flow of streams job
Chart showing overall processing element flow of streams job
Listing 15. Java code for a custom streams operator that connects to RabbitMQ
public class RabbitSource extends AbstractOperator implements Operator {
    private final static Logger logger = Logger.getLogger(RabbitSource.class.getName());
    private Channel channel;
    private ExecutorService executor;
    private String queueName;
    private int port;
    private boolean durable = true;
    private boolean shutdown = false;
    private boolean subscription = true;
    private final LinkedList<String> lastMessages = new LinkedList<String>();

    @Override
    public void allPortsReady() throws Exception {
        connect();
    }

    public void connect() throws Exception {
        logger.info("Starting RabbitMQ Consumer");
        if (subscription) {
            logger.info("Creating Subscription Consumer");
            channel.basicQos(1000);
            channel.basicConsume(queueName, false, new SubscriptionConsumer());
        } else {
            Thread t = getOperatorContext().getThreadFactory().newThread(new PollingConsumer());
            t.start();
            logger.info("Creating Polling Consumer");
        }
        logger.info("Started RabbitMQ Consumer Successfully");    
} ….

Bluemix architecture and deployment

The social leadership board for WWSM consumes social sentiment data with JSON that has been produced by natural language processing algorithms, streaming technologies, and tweets. The platform that computes the social sentiments for sporting players is distributed across a hybrid cloud that includes Bluemix, SoftLayer, and the Events Infrastructure. The service is continuously available with two active sites and one standby site. The two active sites for both Bluemix and SoftLayer are Dallas and London. The standby site for SoftLayer is in Melbourne, while parts of the standby Bluemix site are in Sydney and London. The monitoring and deployment aspects that support the social sentiment will run within a geographically dispersed private cloud.

Figure 6. A continuously available social sentiment service on a private cloud, Bluemix, and SoftLayer
Chart showing continuous     availability social sentiment service
Chart showing continuous availability social sentiment service

Figure 6 shows the technologies that run within the cloud platforms of Bluemix, SoftLayer, and a private cloud. The Events Infrastructure and Content Team have their own private cloud platforms to manage and use the content from the social sentiment service application. A monitoring Java archive (JAR) program is executed within a Java 1.8 virtual machine. The monitoring application will detect three types of problems:

  • If the social sentiment has not been updated within a configurable number of minutes, an alert is thrown.
  • If RESTful interfaces for Bluemix Streams or SoftLayer Object Storage fail, an alert is thrown.
  • If Bluemix Streams is not operating as expected, an alert is thrown.

Streams application bundles are built by a combination of Ant and Maven within Listing 16 and Listing 17 for deployment onto an appropriate Bluemix region, as described by Figure 6. The Streams job called "social1" connects to Twitter and pulls tweets into a streaming application Job 0 of Listing 1. A specific SAB file is built and deployed for social1. The Streams job called "social2" applies Latin filters, UIMA PEAR files for natural language interpretation, window aggregation, and work parallelization. It interfaces with the SoftLayer Object Storage in a way that is similar to Job 3 in Figure 2. A second SAB file is built and deployed to Bluemix Streams. The two jobs social1 and social2 connect to each other through a Streams Processing Language graph. During an event, the active sites in Dallas and London must be continuously available with no planned or unplanned outage. A standby Bluemix site in Australia and London can be activated if one of the other sites is scheduled for a planned outage.

Listing 16. A maven build file for the creation of the "Social2" job, which connects to Ant through a maven plugin
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.ibm.incubator.rs</groupId>
    <artifactId>RacketStream</artifactId>
    <version>2.0.0</version>
    <name>RacketStream</name>
    <description>Project for RacketStream</description>
    <packaging>pom</packaging>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-antrun-plugin</artifactId>
                <version>1.8</version>
                <executions>
                    <execution>
                        <id>compile-spl</id>
                        <phase>install</phase>
                        <configuration>
                            <tasks>
                                <property name="compile_classpath" refid="maven.compile.classpath" />
                                <property name="outputDir" value="${project.build.outputDirectory}" />
                                <property name="sourceDir" value="${project.build.sourceDirectory}" />
                                <ant antfile="${basedir}/spl-compile.xml" target="all" />
                            </tasks>
                        </configuration>
                        <goals>
                            <goal>run</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <modules>
        <module>RacketStreamJava/</module>
    </modules>
</project>
Listing 17. An Ant build file that supports the creation of the “Social2” job
<?xml version="1.0" encoding="UTF-8"?>
<!-- Example of using generating a single SPLDOC set from
   a number of toolkits. In this case all the InfoSphere Streams
   product toolkits, all toolkits under $HOME/toolkits and Vwap.
 -->
<project name="RacketStream" default="compile">

    <property environment="env"/>
    <property name="streams.install" value="${env.STREAMS_INSTALL}"/>
    <taskdef name="splcompile" classname="com.ibm.ssb.ant.sc.ScTask">
        <classpath>
            <pathelement location="ant/com.ibm.ssb.ant.jar"/>
        </classpath>
    </taskdef>

    <property name="mainComposite" value="application::LogAggregator"/>

    <!-- Remove the output directory created by the SPL compile -->
    <target name="clean">
        <delete dir="${basedir}/output" quiet="true"/>
        <delete dir="${basedir}/packaged" quiet="true"/>
        <delete file="${basedir}/${ant.project.name}.tar.gz" quiet="true"/>
        <delete file="${basedir}/${ant.project.name}.tar" quiet="true"/>
    </target>

    <!-- Compile the application, runs sc -->
    <target name="compile">
        <splcompile mainComposite="com.ibm.incubator.rs::GNIPRacketStream" />
    </target>

    <!-- Package the compiled application -->
    <target name="package">
        <mkdir dir="${basedir}/packaged"/>
        <copy todir="${basedir}/packaged/application">
            <fileset dir="${basedir}/output"/>
        </copy>
        
        <copy todir="${basedir}/packaged/data">
            <fileset dir="${basedir}/data"/>
        </copy>

        <package_operator operator-name="RacketStreamJava" package-name="com.ibm.incubator.rs"/>
        
        <mkdir dir="${basedir}/packaged/JSONHelpers"/>
        <copy todir="${basedir}/packaged/JSONHelpers">
            <fileset dir="${basedir}/JSONHelpers"/>
        </copy>
        
        <mkdir dir="${basedir}/packaged/ObjectStorage"/>
        <copy todir="${basedir}/packaged/ObjectStorage">
             <fileset dir="${basedir}/ObjectStorage"/>
        </copy>
        
        <copy todir="${basedir}/packaged/">
            <fileset dir="${basedir}/" includes="*.properties"/>
        </copy>
        
        <tar destfile="${basedir}/${ant.project.name}.tar" basedir="packaged/"/>
        <gzip destfile="${basedir}/${ant.project.name}.tar.gz" src="${basedir}/${ant.project.name}.tar"/>
    </target>
    
    <macrodef name="package_operator">
        <attribute name="operator-name" />
        <attribute name="package-name" />
        
        <sequential>
            <mkdir dir="${basedir}/packaged/@{operator-name}"/>
            <copy todir="${basedir}/packaged/@{package-name}">
                <fileset dir="${basedir}/@{package-name}"/>
            </copy>
            <copy todir="${basedir}/packaged/@{operator-name}/target">
                <fileset dir="${basedir}/@{operator-name}/target" includes="**/*.jar"/>
            </copy>
            <copy file="${basedir}/toolkit.xml" tofile="${basedir}/packaged/toolkit.xml"/>
            <copy file="${basedir}/info.xml" tofile="${basedir}/packaged/info.xml"/>
        </sequential>
    </macrodef>
    
    <target name="all" depends="clean,compile,package"></target>
</project>

The SoftLayer cloud platform provides a data storage function called Object Storage. The geographical locations of SoftLayer Object Storage were selected to match the Bluemix Streams geographical locations for performance, accessibility, and availability considerations. The PEAR file that contains the logic, dictionaries, and sentiment definitions that are shown in Figure 4 is uploaded to Object Storage for periodic download by the Bluemix Streams application. A players comma-separated version (CSV) file that contains sporting players' names and identification codes is also downloaded by the Bluemix Streams application within a configurable periodicity. The output of the Bluemix Streams application is in the form of a JSON file and uploaded to a SoftLayer Object Storage container.

An Object Storage JAR file runs within the content team's private cloud to pull down the updated sentiment.json file that will be parsed by a social leadership board application and generate the view shown in Figure 3. A connection properties file contains the passwords, URLs, containers, user IDs, and so on, to connect to Object Storage. An optional encryption JAR file can be run to encrypt all passwords that will be stored on disk.

Figure 7. A functional flow of the hybrid cloud that supports social sentiment computing
Chart showing function flow of cloud
Chart showing function flow of cloud

The Events Infrastructure private cloud configures Global Server Load Balancers (GSLB) to distribute the load across all SoftLayer Object Storage regions. Any access to Object Storage will go through the GSLBs, including monitoring. The GSLBs will also provide detections for HTML error codes with the ability to reroute traffic as required. The function further supports the continuously available service.

Conclusion

In this tutorial, we have provided an overview of the streaming computing architecture deployed for IBM's presentation of professional golf and tennis tournaments. We discussed the use of IBM InfoSphere Streams and our Streams Processing Language customizations supported by the Java language. Also, we showed the implementation of a UIMA PEAR file within Streams' processing element and how it provides annotations for sporting social leadership boards. Several examples and concrete code blocks were presented as examples for developers.

In part 4 of this series, we will discuss the processing of data at rest with technologies such as Hadoop, IBM InfoSphere BigInsights, and a web crawler.


Downloadable resources


Related topic


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Data and analytics
ArticleID=1026585
ArticleTitle=Predictive Cloud Computing for professional golf and tennis, Part 3: Big data in motion
publish-date=02102016