IBM InfoSphere Streams Version 4.1.1

Stream processing

As the name implies, SPL is a language for processing data streams.

But the HelloWorld example from the previous section hardly qualifies as stream processing, since there was only a single stream with a single tuple in that program. Here is introduced a more idiomatic example that processes streams of a priori unknown length, by using a graph of operator invocations that have pipeline parallelism. The purpose of the program is to list a file, prefix each line with a line number, and write the result to another file. It accomplishes this purpose with the following stream graph:

Figure 1. Stream graph of the NumberedCat program.
This figure is described in the surrounding text.

A stream is a (possibly infinite) sequence of tuples; in the example, Lines and Numbered are streams. A tuple is a data item on a stream. In the example, the stream Lines transports one tuple for each line in the input file. An operator is a reusable stream transformer: each operator invocation transforms some input streams into some output streams. The place where a stream connects to an operator is called a port. Many operators have one input port and one output port (like Functor in the example), but operators can also have zero input ports (FileSource), zero output ports (FileSink), or multiple input or output ports (which is shown in later examples).

But back to the line-numbering program. It is called NumberedCat as a homage to the UNIX cat utility that, given the right command-line options, performs the same task. Here is the code:

 composite NumberedCat {
  graph
    stream<rstring contents> Lines = FileSource() {
      param  format        : line;
             file          : getSubmissionTimeValue("file");
    }
    stream<rstring contents> Numbered = Functor(Lines) {
      logic  state         : mutable int32 i = 0;
             onTuple Lines : i++;
      output Numbered      : contents = (rstring)i + " " + contents;
    }
    () as Sink = FileSink(Numbered) {
      param  file          : "result.txt";
             format        : line;
    }
 }

Like in the previous example, there is a composite operator definition with a graph clause that contains operator invocations. The invocation of FileSource in Lines 3-6 reads one line at a time (param format : line), from a file that is specified at submission-time (param file : getSubmissionTimeValue("file")). Later, how to supply the file name at submission time is shown. The invocation of Functor in Lines 7-11 maintains a state variable mutable int32 i = 0, which it increments each time that a tuple arrives (onTuple Lines : i++). SPL variables are immutable by default, so without the mutable modifier, the compiler prevents it from incrementing i++. The output clause output Numbered : contents = (rstring)i + " " + contents assigns the contents attribute of the output stream by casting the line number i to a string (rstring)i, and concatenating it with the contents attribute of the input stream. As the example shows, an output clause has assignments where the left side is an attribute of the output stream, whereas attribute names in the right side belong to input streams. Finally, the invocation of FileSink on Lines 12-15 writes the results to a file named result.txt.

Try out the following procedure. Create a directory that is called NumberedCat and also make a subdirectory NumberedCat/data. Put the example program in a file NumberedCat/NumberedCat.spl. Compile it to a stand-alone executable file with sc -T --data-directory data -M NumberedCat. Put the following text in a file NumberedCat/data/catFood.txt:

The Unix utility "cat" is so called
because it can con"cat"enate files.
Our program behaves like "cat -n",
listing one file and numbering lines.

When the program runs, you must supply the input file name as a submission-time value. The FileSource operator expects a file name that is relative to the NumberedCat/data directory. Therefore, run the program with ./output/bin/standalone file="catFood.txt". Look at the NumberedCat/data directory. If everything went fine, then the program created a file that is called result.txt that contains the numbered lines of catFood.txt.

So far, this example shows programs that run in stand-alone mode, which is common during testing and debugging. But in production with InfoSphere® Streams you will run within an iInfoSphere Streams instance. To run within an instance, first compile without the -T,--standalone-application option, and then create a domain and an InfoSphere Streams instance into which the job is submitted.
Note: This tutorial uses embedded zookeeper. For production, you should use an external zookeeper ensemble, which will allow you to run on a cluster.
Try the following sequence of commands. Enter your user/password when prompted:
sc --data-directory data -M NumberedCat                                               # compile
streamtool mkdomain -d streamsdomain --embeddedzk                                          # make a domain
streamtool startdomain -d streamsdomain --embeddedzk                                       # start the domain
streamtool genkey -d streamsdomain --embeddedzk                                            # generate keys 
streamtool mkinstance  -i streams -d streamsdomain --embeddedzk                       # make an instance
streamtool startinstance   -i streams -d streamsdomain --embeddedzk                                    # start the runtime instance
streamtool submitjob -i streams -d streamsdomain --embeddedzk -P file=catFood.txt output/NumberedCat.sab # submit the job
streamtool lsjobs   -i streams -d streamsdomain --embeddedzk                                           # list running jobs
# wait until data/result.txt contains the numbered lines of data/catFood.txt
streamtool canceljob -i streams -d streamsdomain --embeddedzk 0                                          # cancel the job
streamtool stopinstance -i streams -d streamsdomain --embeddedzk                                         # stop the runtime instance
streamtool rminstance  -i streams -d streamsdomain --embeddedzk                                        # remove the runtime instance
streamtool stopdomain -d streamsdomain --embeddedzk                                         # stop the domain
streamtool rmdomain -d streamsdomain --embeddedzk                                           # remove the domain

If everything went well, this action accomplished the same result as running the program stand-alone. If anything went wrong, consult your system administrator, or try to diagnose the problem yourself by using the streamtool getlog or streamtool viewlog commands. As mentioned before, the best way to learn a language is to write and run programs in it, so now is a good time to ensure that you have the right setup to do that. Note how the streamtool submitjob command accepts submission-time values with the -P option, and uses the application bundle file to figure out which operators to submit.

The previous description illustrated SPL as a streaming language, and gave you a taste for how to run programs on an instance of the InfoSphere Streams distributed run time. You saw three SPL standard toolkit operators FileSource, Functor, and FileSink. To learn more about working with the distributed run time, type streamtool man, which contains a plethora of information about commands like submitjob and family.