Contents


Using Spark Streaming for keyword detection

Develop a Spark Streaming application by using Java

Comments

Many companies store and analyze data with a distributed file system like Apache Hadoop. Using streaming analytics with offline Hadoop allows you to store enormous amounts of big data and analyze that data in real time. This article shows you an example of how to use Spark Streaming to enable real-time keyword detection.

Spark Streaming is an extension of the Spark API that enables scalable, fault-tolerant processing of live data streams. Spark Streaming has a plethora of adapters that allow application developers to read and write data from various sources, including Hadoop distributed file system (HDFS), Kafka, Twitter, and more.

Prerequisites

  • Software prerequisites: IBM InfoSphere® BigInsights version 4.0 or higher and Apache Maven.
  • Business prerequisites: Intermediate Java™ development skills and introductory knowledge of Hadoop and Spark.

Solution overview

Spark Streaming applications consist of one or more interconnected discretized streams (DStreams). Each DStream consists of a series of resilient distributed data sets (RDDs), which are an abstraction of immutable distributed data sets. Spark supports different application development languages, including Java, Scala, and Python. For this article, we'll use the Java language to show you a step-by-step approach to develop keyword detection applications.

Figure 1 shows a high-level view of the keyword detection application.

Figure 1. Diagram of keyword detection application

Explanation of the components in Figure 1

  • SocketTextStream allows you to bind and listen to messages on a transmission control protocol (TCP) socket. The output of SocketTextStream is fed into a custom stream that uses the current keyword list to find a matching token.
  • TextFileStream is used for monitoring a Hadoop directory. Whenever it detects a new file, it reads the file and converts it into DStreams.The values that are read by TextFileStream are used for updating internal keyword list with a custom logic.
  • Keyword detection logic uses the updated keyword list, so the diagram uses dotted lines to indicate this relationship.

Implementation details of each of these components

Every Spark Streaming application begins with a streaming context as shown in following snippet. "Context" requires you to pass a duration parameter that defines a batch.

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));

socketTextStream binds to a specified host interface and given port number and generates the DStreams.

JavaReceiverInputDStream lines = ssc.socketTextStream( hostname, port, StorageLevels.MEMORY_AND_DISK_SER);

textFileStream uses textFile to read the parallel keyword dictionary file from Hadoop. The file is processed, and the keywords are updated in the internal list.

JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest");
JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterable<String> call(String x) {
final Pattern SPACE = Pattern.compile(" ");
String[] vec=SPACE.split(x);
List<String> ls=Arrays.asList(vec);
return ls;
}
});


updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){
public Void call(JavaRDD<String> rdd) {

rdd.foreach(new VoidFunction<String>(){
@Override
public void call(String x){
if(x!=null)
keywords.add(x);

}});
return null;

The DStream that is read from SocketStream is used to compare against the keyword list as shown in the following code. The result is displayed on the console when you use the command wordPresent.print();.

JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() {
@Override
public Boolean call(String x) {
return keywords.contains(x);
}
});
wordPresent.print();

The following listing shows the complete code listing of the sample used in this article.

public final class KeywordDetect {
	private static final Pattern SPACE = Pattern.compile(" ");
	public static List<String> keywords=new ArrayList<String>();
	public static void main(String[] args) {
	
    if (args.length < 2) {
      System.err.println("Usage: KeywordDetect <hostname> <port> <words>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("KeywordDetect");
	
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
	JavaDStream<String> filelines = ssc.textFileStream("/tmp/Streamtest");
	
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
            args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
			
	keywords.add("initial"); //Initialize keyword list

	JavaDStream<String> updatedKeyWords = filelines.flatMap(new FlatMapFunction<String,String>() {
      	@Override
      	public Iterable<String> call(String x) {
		 final Pattern SPACE = Pattern.compile(" ");
        	String[] vec=SPACE.split(x);
		List<String> ls=Arrays.asList(vec);
		
		return ls;
      }
    });
	
	updatedKeyWords.foreachRDD(new Function<JavaRDD<String>, Void> (){
	public Void call(JavaRDD<String> rdd) {

		rdd.foreach(new VoidFunction<String>(){
		@Override
		public void call(String x){
			//x=x+1;
			if(x!=null)
			keywords.add(x); //add newly read tokens to keyword list
			
		}});
	
	return null;
	}
	});
	
	
    JavaDStream<Boolean> wordPresent = lines.map(new Function<String, Boolean>() {
      @Override
      public Boolean call(String x) {
        return keywords.contains(x); //compare token received from socket against keywords list
      }
    });

	JavaDStream<String> inputWords = lines.map(new Function<String, String>() {
      @Override
      public String call(String x) {
        return x;
      }
    });

	wordPresent.print();

    ssc.start();
    ssc.awaitTermination();
  }
}

Compiling and starting the program

For the example in this article, we use Maven to install and build the application. If you are using Maven, make sure that you add the appropriate dependencies in pom.xml. The dependencies will mostly be the spark-core and spark-streaming libraries.

The following code shows a snippet of pom dependencies that are used in our application:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>

After the application is compiled and the jar file is created, use the following command to submit the application to the Spark scheduler:

spark-submit --class "org.apache.spark.examples.streaming.KeywordDetect" --master local[4] target/KeyWord-1.0.jar rvm.svl.ibm.com 9212

Since a Spark instance is running on a single host with four cores, we are using local[4] value for –master argument. Our application accepts two parameters: host name and port.

The application assumes that there is a server process running on port 9212 and publishing data. To simulate a server in a test environment, we are using the nc (netcat) Linux command: nc -l 9212.

The nc command binds to 9212. Any input that we pass in the terminal is forwarded to all the clients that are listening to port 9212

After everything is set up correctly, the submitted job starts working and listening to the port 9212. You should get the following confirmation message on the terminal:

15/09/06 01:43:31 INFO dstream.SocketReceiver: Connecting to rvm.svl.ibm.com:9121
15/09/06 01:43:31 INFO dstream.SocketReceiver: Connected to rvm.svl.ibm.com:9121

Now, let's update the internal dictionary that is used by the program. The code in section 1.2 is listening for the change event in the Hadoop directory /tmp/Streamtest. If the directory is not created yet, create it first and then upload the keyword files by using the command show below:

hadoop fs -mkdir /tmp/Streamtest hadoop fs -put keywords /tmp/Streamtest

When a new file is detected, subsequent RDDs are executed.

15/09/06 01:54:25 INFO dstream.FileInputDStream: New files at time 1441529665000 ms: hdfs://rvm.svl.ibm.com:8020/tmp/Streamtest/keyword
15/09/06 01:54:25 INFO storage.MemoryStore: ensureFreeSpace(272214) called with curMem=109298, maxMem=278302556

One of the keywords is "risk." Now I submit the keyword in nc as shown in the following listing

[root@rvm Desktop]# nc -l 9121
risk

Then, Spark detects the keyword and is flagged as true on console.

-------------------------------------------
Time: 1441529995000 ms
-------------------------------------------
true

Future enhancements

This application can further be enhanced to process complete string instead of individual tokens.

The keyword detection status can be written to a file or written to a port of a UI-rendering service.

Exception conditions

If you get a "connection refused error," it could be because:

  • HDFS and Yet another resource negotiator (YARN) are not running
  • The Server process is not running on source port. In our example, the source port is 9121.

Conclusion

This article demonstrates how to use Spark Streaming to build a real-time application. We also highlighted the building blocks of a Spark Streaming application. Using this information as a starting point will help you use Spark Streaming to create more complex applications.


Downloadable resources


Related topic


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management, Java development
ArticleID=1022015
ArticleTitle=Using Spark Streaming for keyword detection
publish-date=11262015