IBM provides with InfoSphere Streams a powerful and scalable solution to process a continuous flow of millions of data tuples per second. The stream engine thereby provides a robust, highly available, and scalable runtime that handles executing your analytic logic on hundreds of nodes and CPU cores. This gives you the freedom to focus on making use of your data, instead of having to spend time on the how and where.
In a typical field of operation, InfoSphere Streams can help you process incoming data on the fly and react within sub-seconds on new events, instead of having to process the data after the fact and with a delay of several minutes or even hours. Figure 1 shows a system architecture in which Streams helped process 6 billion call data records per day about calls sent in a cellphone network.
Figure 1. Streams helping detect problems in a cellphone network
The time to process and load those records into the BI and accounting systems was reduced from 12 hours to a few seconds and now allows a real-time view at customer activity and network quality.
Although Streams was originally designed to process data on the fly, it can also be used for simulation purposes. The following chapter describes how data flow can be set up, so data is fed into the processing engine again after processing. There are a few challenges to address, which might not be so obvious, but will be explained in detail below.
A stream is a continuous data flow from a source to a target. The data is in the form of tuples, which comprise a fixed set of attributes. A Stream can be compared with the content of a database. In a database, the data is stored in a row and column format. The columns are the individually defined elements that make up the data record (or row). In Streams, these data records are called tuples and the columns are called attributes.
To understand the concept of streams processing, four terms are important to understand: sink/source adapters, operators, streams programming language (SPL), and streams runtime environment.
Figure 2. Streams environment
The source of a stream is called a source, while the target is called sink. Such a sink or source can be a file, database, or TCP/IP socket, or any other custom-made Java™ or C++ program generating or saving tuples.
Operators allow processing stream tuples to filter, aggregate, merge, transform, or perform much more complex mathematical functions such as Fast Fourier transformations. They take one or more streams as input, perform the processing of the tuples and attributes, and produce one or more streams as output.
A Streams application is then a collection of such operators connected by streams. The streams application is written in a declarative programming language, such as SQL. Unlike more common programming languages like C, C++, or the Java programming language, a declarative programming language does not contain how something should be done but how the expected results should look. The compiler translates the description of the expected results into programming code, performing the necessary operations to get the expected results.
During execution of a Streams application, the runtime will take care of a reliable execution of the sink and source adapters, in addition to the operators, and distribute them over the available processing cores and nodes to achieve optimal throughput.
A Streams application can be used to perform analytics on the stream while the the tuples are coming in. The available information will be used while it is produced, so you have an almost real-time view of it, while there is no need to wait for the next typical processing window.
Running a simulation with Streams
In a recent concept study, a traffic simulation was developed to simulate changes on a highway net (construction sites, moving construction sites, accidents, additional lanes, speed limits) on the throughput and length of traffic jams. The Streams environment was chosen because of its highly scalable parallel execution runtime, which fits perfectly with the Nagel-Schreckenberg model for traffic simulations.
This model simulates the movement of each car, truck, or motorcycle. One simulation round represents 1 second in real time. The road is divided into sectors, each 7.5 meters long, and each vehicle will be moved according to the traffic rules and speed, and is allowed to accelerate or slow down.
For simplicity and manageability, a whole network of streets can't be simulated at one big entity. Rather, each segment of a street among intersections, slip roads, or crossings will be managed as a separate Nagel-Schreckenberg model, and each segment will then be connected with others. Vehicles leaving one segment and entering another have to be handled over to the corresponding segment at the end of each simulation cycle.
Figure 3. Connected Nagel-Schreckenberg segments
Why rounds have to be synchronized
Each street is simulated by one Streams application. The Streams environment distributes the computing tasks to the available computer and processor resources. It is critical that each Streams application finishes the round before a new one is started because applications calculating shorter streets would process more rounds than others, which would cause shorter streets to place a lot more vehicles on following streets. This is not a problem as long as each street has only one following street. As soon as there are entry/exit ramps and highway intersections, like in real life, the different round rate would cause problems.
Entering and leaving road segments
Each Streams application can use pre-defined or custom-made sink and source adapters to let Streams tuples enter or leave a Streams application. In our case, a source adapter will wait for vehicles coming in from previous road segments, and a sink adapter will handle vehicles leaving the road segments over to connecting road segments. The standard source and sink operators can read or write data in various formats to and from files, databases, or a TCP/IP socket, but do not have the necessary capability to synchronize simulation cycles across all Streams applications. We implemented a separate server to synchronize our sink and source adapters: TCP/IP sink and source adapters.
Implementing the synchronization
At the beginning of each round, all vehicles are inserted into the Streams application by a source adapter listening to a TCP/IP socket. To synchronize the round rate for all Streams applications, each application needs to halt processing input data until all applications finished calculating the previous round. The Streams sort operator can be used for this. It sorts tuples in a so-called window and releases all sorted tuples at once. In our case, the source adapter places a special vehicle called punctor on the road segment to indicate that all vehicles have been received. A punctor operator will wait for such a punctor vehicle and generate a window marker, which then signals the sort operator that all vehicles for this simulation cycle have been received and can be released at once. In the following figure, each vehicle will then be moved according to the rules of the Nagel-Schreckenberg model.
Figure 4. Flow between Streams applications and Java threads
At the end of processing, the stream is split in two parts: All leaving vehicles, including the punctor vehicle itself, are sent by the TCP sink to the synchronizer; and vehicles that stay are directly redirected to the punctor operator.
The synchronizer and the Streams applications communicate via TCP sockets. The Streams applications use a TCP/IP sink operator to write vehicle data to a socket as a comma-separated value (CSV) string. The synchronizer has been implemented as a separate Java TCP/IP server, which receives vehicle data from the socket. It keeps track of all started Streams applications and knows which ones have already send their punctor vehicles. Once all Streams applications have completed their simulation cycles, a signal is sent to the vehicle generator, which will now send all the vehicles that left a road segment to the source adapter of the road segment they are now entering.
We have shown that InfoSphere Streams is not just limited to real-time processing of your data tuples but can also be used as a parallel processing infrastructure for simulations. Its underlying runtime takes care of high availability, scheduling, and parallel execution, allowing you to concentrate on making use of your data. The Streams runtime comes with a pre-defined set of operators and functions, which will allow you to better handle your processing requirements. In cases where they are not sufficient, you can easily add custom operators and functions to it.
- Learn more about Information Management at the developerWorks Information Management zone. Find technical documentation, how-to articles, education, downloads, product information, and more.
- Stay current with developerWorks technical events and webcasts.
- Follow developerWorks on Twitter.
Get products and technologies
- Build your next development project with IBM trial software, available for download directly from developerWorks.
- Now you can use DB2 for free. Download DB2 Express-C, a no-charge version of DB2 Express Edition for the community that offers the same core data features as DB2 Express Edition and provides a solid base to build and deploy applications.
- Participate in the discussion forum.
- Check out the developerWorks blogs and get involved in the developerWorks community.