Using InfoSphere Streams with memcached and Redis

A large-scale in-memory Distributed Process Store

InfoSphere® Streams is a powerful middleware product that provides a platform for development of streaming applications and their execution in a fast and distributed manner. In a stream processing system, there is often a need to externalize the application-related state information and share it with other distributed application components. It is possible to do distributed data sharing within the context of an InfoSphere Streams application. This is achieved through the use of the Distributed Process Store (dps) toolkit that provides a key-value store abstraction. The shared state can be stored in memcached or Redis — two popular open source distributed state management systems.

Share:

Senthil Nathan (sen@us.ibm.com), Senior Technical Staff Member, IBM TJ Watson Research Center

Senthil NathanSenthil Nathan is a senior technical staff member at the IBM Thomas J. Watson Research Center in Yorktown Heights, New York. He has 28 years of experience in building software for different technology areas, including SOA, Web services, Java Enterprise Edition, PHP, Web 2.0, and Ajax. At the time of writing this article, his full job focus is on the InfoSphere Streams product and its commercial applications.



Bugra Gedik, Dr. (bgedik@cs.bilkent.edu.tr), Assistant Professor, Bilkent University, Ankara

Photograph of author Dr. GedikDr. Bugra Gedik is currently a faculty member in the Computer Engineering Department, Bilkent University in Ankara, Turkey, and a consultant for Korvus Bilisim R&D in Ankara, Turkey. Prior to that, he worked as a research staff member at the IBM Thomas J. Watson Research Center. His research interests are in distributed data-intensive systems with a particular focus on stream computing. In the past, he served as chief architect for InfoSphere Streams. He is the co-inventor of the SPL and the SPADE stream processing languages. He was named an IBM Master Inventor and is the recipient of an IBM Corporate Award for his pioneering work in the System S project.



22 October 2013

Introduction

InfoSphere Streams is a high-performance real-time event and stream processing middleware product. It can ingest high volumes of data from a variety of data sources and process that data at very high speeds. It achieves these functions through a combination of an easy-to-use application development language called Streams Processing Language (SPL) and an optimized, distributed runtime platform. This middleware also provides a flexible application development framework to integrate custom code written in C++ and the Java™ language into InfoSphere Streams applications. InfoSphere Streams is adept at passing data tuples efficiently among a distributed collection of analytic components. In addition, it's possible to share data among such InfoSphere Streams analytic operators running on different processes and different machines. Such a distributed data sharing scheme is possible by using a back-end in-memory store.

Download InfoSphere Streams

InfoSphere Streams is a high-performance computing platform that allows user-developed applications to rapidly ingest, analyze, and correlate information as it arrives from thousands of real-time sources. It offers a highly scalable and powerful analytics platform that can handle incredibly high data throughput rates that can range to millions of events or messages per second.

Download InfoSphere Streams now.

This article introduces the Distributed Process Store (dps) — an SPL toolkit that uses native SPL functions to provide a key-value store abstraction for streaming applications. The dps toolkit currently supports two well-established open source in-memory store technologies — memcached and Redis — as back ends for managing the distributed state. Hereafter, we will use the term back-end in-memory store to refer to both memcached and Redis unless there is a need to distinguish between one vs. the other.

This article expects you to be familiar with InfoSphere Streams and its SPL programming model. For in-depth details about InfoSphere Streams and the back-end in-memory store, you can use the publicly available information in the Resources section.

InfoSphere Streams is a key component in IBM's big data product strategy. Many of IBM's current and prospective customers can take advantage of the ability to manage large-scale state by using the distributed data sharing facility for InfoSphere Streams as explained below. Other developers who are interested in this general topic area would also benefit from this article. It is targeted at readers whose technical focus is big data applications, including application designers, developers, and architects.

Before delving into specific details about the dps, it is important to be aware of one point. InfoSphere Streams applications may need data sharing in two cases:

  • Data sharing between fused operators in the same processing element (PE)
  • Data sharing between operators potentially in different PEs and even in different applications

InfoSphere Streams allows multiple operators to be fused into a single PE. In that situation, it is optimal and much faster to use an entirely different data sharing facility called the Process Store (ps). This particular approach to sharing data among the fused operators within a single PE is not the scope of this article. Readers are advised to refer to a freely downloadable toolkit available in the developerWorks Streams Exchange website (see Resources).

When InfoSphere Streams PEs running on a single machine or multiple machines want to share data, it's appropriate to use the dps facility, which is the focus of this article.

Motivation: Why do we need a dps?

Typical stream processing applications are composed of operators that have local state and communicate with each other via streams. The local nature of the state means that operators do not have access to shared state. However, shared state is an important need in certain applications, where the data needs to be accessed by multiple PEs running on one or more machines. One example is to manage dynamic configuration data accessed by multiple operators. Another example is to access large-scale state that does not fit into the local memory of a single operator. Yet another example is to provide an external system access to the application data.

One way to emulate shared state within a streaming application is to employ a mesh stream connection topology to send updates to the state from any operator to any other operator sharing the state. However, not only will this approach result in an unwieldy topology but it also makes synchronization difficult. Furthermore, this method does not scale in terms of the communication cost, as well as the maximum state size that can be supported. A new abstraction is warranted to fulfill the need to have access to shared state.

The dps is a solution that provides access to shared state using a key-value store abstraction modeled after SPL's map data type. The shared state can be accessed from anywhere inside an InfoSphere Streams application, whether it be SPL code, SPL functions, SPL native functions, C++ primitive operators, and Java primitive operators. That is the first and foremost design goal addressed by our dps implementation. It is all done through a set of easy-to-use native functions. Actual state information is stored separately in a distributed back-end in-memory store, which is transparent to the InfoSphere Streams application.

Background: SPL maps, native functions, memcached, Redis

We will briefly survey the main ingredients involved in the dps — namely SPL maps, SPL native functions, and a proven, commercially used open source back-end in-memory store, such as memcached or Redis.

An SPL map is a data structure, akin to a hash table (also known as an associative array), that maps keys to values. Any valid SPL type can serve as the key type or the value type. Data insertion, lookup, and deletion operations are constant time operations; in other words, O(1). SPL maps also support iteration, but there is no predefined iteration order. The dps provides the same functionality provided by an SPL map, but the data is not kept locally. Instead, it is kept remotely in a distributed back-end in-memory store. In fact, this is a major difference between the two data sharing facilities mentioned above (ps and dps). This difference brings the additional ability to access data from multiple operators within the streaming application, at the cost of a higher constant factor overhead for all basic operations.

The dps is an add-on facility provided as a set of native functions, which are similar to SPL functions in terms of their use, but they are different in terms of how they are defined. While SPL functions are defined in the SPL source code using the SPL statements and expression language, native functions are defined in a C++ library and are registered with SPL by using function models. The distributed process store toolkit is implemented in this way. To use it, SPL native functions that will be described later are to be employed. For instance, since there is no distributed map type in the SPL language proper to create a dps, one would have to call a function (in this case, createStore) to create the store. This function will return a handle that represents the store. All future operations are to be performed using the store handle, including removing the store. This is unlike the conventional data types, which are lexically scoped, and more like dynamic allocation, which is not available in SPL by default.

Finally, the dps can be configured to use either the memcached or Redis in-memory middleware for managing the shared state remotely. memcached and Redis use multiple machines to provide a distributed store, where binary values can be looked up using string keys. memcached and Redis require a daemon to be run on each node that will manage the data in memory. memcached and Redis do not understand SPL types, nor do they provide support for iteration over maps. The distributed store library implements a layer on top of memcached and Redis to integrate the notions of multiple stores, SPL types, iteration over maps, and distributed locks.

Let us say a few words about the two back-end in-memory stores supported by the dps implementation. For dps to work, one needs to have a memcached or a Redis server infrastructure up and running. memcached has been around since 2003, and it has been put to use in many high-profile commercial environments. It is a pure distributed cache capable of running on several dozens of machines to provide a massive in-memory store. On the other hand, Redis is a nimble cousin of memcached that came into existence in 2009. Features of Redis are very rich:

  • It supports complex types (lists, sets, hashes, etc.).
  • It replicates with a single master and multiple slaves.
  • It provides persistence through periodic snapshots of the in-memory store contents to disk.
  • It has the Append Only File (AOF) feature that journals all the changes made to the store contents, thereby recovering the entire store after a machine crash.
  • It provides clustering support for running Redis on multiple machines.

We can choose memcached or Redis; either will serve us well.

If needed, one can write additional code and extend the dps toolkit to support other RDBMS back ends, such as IBM DB2®, IBM Informix®, or MySQL. This topic belongs in a separate discussion, and we will not get into that here. Such dps extensions can have performance consequences associated with SQL processing CPU overhead and hard disk-based storage media access latencies.

Conceptual overview: A coming together of InfoSphere Streams and dps

Figure 1. The dps from a high level
Image shows dps conceptual overview

Figure 1 illustrates how two high-performance paradigms (one created by IBM, and the other created by the open source community) come together to deliver a powerful one-two punch for distributed data sharing. It shows three layers of the dps architecture. In the top layer, there are distributed streaming applications running with their own interconnected PEs shown in green. In the middle layer, a logical view of the dps is shown. In the bottom layer, distributed physical infrastructure for a back-end in-memory store is shown. Each PE in green has access to a dps logical container as shown in yellow. Every PE also has full access to all the store APIs. In the logical view layer, code abstraction is done for these four important aspects:

  1. Serialization required before storing the user data in the store
  2. Deserialization required after fetching the data from the store and just before returning it to the user
  3. In-memory database abstraction for any kind of back-end store product to be fitted with the dps
  4. Platform dependent implementation necessary to talk to a specific back-end in-memory store (memcached or Redis, for example)

In the physical view layer, actual work involved is a simple matter of installing and configuring a back-end in-memory store on one or more machines. As shown above, any PE can safely access the store to save and retrieve contents regardless of which other PE originally created the store with its initial contents (if any). All that a PE will need to access a store is a valid store name or a store handle. This three-layer dps implementation is unobtrusive, and it is always available for use by the already-running InfoSphere Streams applications, as well as by any new applications that will be built and started in the future. In addition, this design is very flexible and lends itself to a high degree of scaling at all three layers.

Figure 2. The dps from the inside
Image shows dps inside view

Figure 2 gives us a peek inside an actual dps. An instance of the dps accommodates one or more user-created stores. Each store can hold an unlimited number of data items stored as key-value pairs made using any SPL type as a key and any SPL type as a value. To facilitate the users in creating an arbitrary number of stores, a given dps instance contains a store factory that can manufacture stores on demand. Similarly, any store can be accessed concurrently by any number of PEs running on different machines. That access should be safely done without overriding the intended store operations of each other. Safety can be ensured by using the distributed locking feature that we will see in detail later. Hence, a given dps instance will also provide a lock factory from which users can create their own sharable locks for the purpose of locking a store during a critical private operation and then releasing the lock once that critical store operation is completed.

Having covered a good conceptual overview about the dps, we will get into the implementation specifics for all the three dps layers in the following sections. We will do a bottom-up walk-through by explaining first what needs to be done in the physical view layer. Then we will cover what is being done in the logical view layer. After that, we will give a glimpse of several code snippets that exercise all the dps APIs available in the streaming applications layer.

Installing and configuring a back-end in-memory store

Before you can do anything with the dps toolkit, you should have either the memcached or the Redis server infrastructure up and running. Both memcached and Redis are easy to install either on a single machine or on multiple machines. Follow the instructions below for installing memcached or Redis, both of which we used for testing this toolkit in our InfoSphere Streams lab. Please note that memcached and Redis are open source offerings and carry a BSD license. You have to first decide on which machines (one or more) to use for running the memcached or the Redis servers. The more the merrier.

The dps toolkit can be compiled and used in any of the following Linux® versions installed with any of the following back-end in-memory store servers. Follow a particular installation procedure for the back-end in-memory store product you have selected (memcached or Redis):

  • RedHat Enterprise Linux 6.4 (or an equivalent CentOS version) + gcc V4.4.7 20120313 (Red Hat 4.4.7-3)
  • RedHat Enterprise Linux 5.9 (or an equivalent CentOS version) + gcc V4.1.2 20080704 (Red Hat 4.1.2-54)
  • InfoSphere Streams V3.0 or above
  • memcached server V1.4.15 or above
  • Redis server V2.8.0-rc4 or above

Installing memcached

  1. Install libevent package as described below:
    1. Log in as root and install one at a time on each of your machines assigned for the back-end in-memory store
    2. Download the latest stable tar.gz file
    3. Unzip the downloaded tar.gz file
    4. Change to the newly unzipped directory
    5. Run this command: ./configure
    6. Run make
    7. Run this optional command: make verify
    8. Run make install
    9. Log out as root and login as the regular Linux user
    10. In the regular user's .bashrc file, add this in a single line:
      export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
    11. From a Linux terminal window, run this command: source ~/.bashrc
  2. Install the memcached server by following these steps:
    1. Ensure that you are logged in as root and install on each of your back-end in-memory store machines
    2. Run wget http://memcached.org/latest
    3. Run tar -xvzf memcached-1.x.x.tar.gz
    4. Run cd memcached-1.x.x
    5. Run ./configure --prefix=/usr/local/memcached
    6. Run make
    7. Run this optional command: make test
    8. Run make install
    9. Logout as root

If you have successfully installed memcached, you can log in as a regular Linux user and start memcached on one or more servers as shown below.

  1. On every memcached server machine, type this command in a single line:
    export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
  2. Run memcached with at least 512 MB of memory cache:
    /usr/local/memcached/bin/memcached -m 512 -d -v -M
  3. For real-world applications, you may want to start it with several gigabytes of memory for the -m option. The -d option will daemonize and run the memcached server in the background, the -v option will do logging with a minimal verbosity, and the -M option will tell memcached not to delete existing data items when memory is full and to simply return an error.
  4. If you have the netcat utility installed on your Linux machine, you can do a quick test from a memcached client machine:
    echo "stats settings" | nc memcached server name goes here 11211
  5. Whenever you want to stop memcached, you can simply run killall memcached.

Installing Redis

Note:For Redis, root permission is not required. A regular Linux user can install and start Redis.

  1. Run this command:
    wget http://download.redis.io/releases/redis-2.8.0-rc4.tar.gz
  2. Run tar -xvzf redis-2.8.0-rc4.tar.gz
  3. Run these commands:
    cd redis-2.8.0-rc4
    unset ARCH
  4. Run make

That is it. The binaries that got compiled are now available in the src subdirectory for you to run the Redis server.

If you have successfully installed Redis, you can start it on one or more servers, as shown below:

  1. Configure Redis with at least these options:
    • In the redis-XXXX/redis.conf file, change the demonize parameter from no to yes to make Redis run in the background.
    • Decide whether to set appendonly to yes to protect your store data from machine crashes.
    • Choose whether to configure other optional configurations such as master/slave, redis cluster, and so on, or skip these options for now.
  2. After the above-mentioned configuration is done, from the redis-XXXX/src directory, run ./redis-server ../redis-conf.
  3. You can quickly verify if your redis installation works by doing these checks:
    • From the redis-XXXX/src directory, run redis-cli
    • Once you are inside the Redis client shell, type these commands:
      redis> set foo bar
      OK>
      redis> get foo
      "bar"
      redis> del foo
      redis> exit
  4. Whenever you want to stop Redis, you can simply run killall redis-server.

Now that the physical view layer is ready, let's move one layer up and look inside the logical view layer. This layer provides the necessary technical interface between the InfoSphere Streams applications and the back-end in-memory store servers. This middle layer is all about the dps library APIs that work with the other two layers in the top and the bottom of Figure 1.

Library API: dps functions

As explained, the main goal of the dps facility is to give an easier way for InfoSphere Streams developers to share application data among PEs running on a single machine or multiple machines. There is a library of dps native functions that can be called from anywhere within the SPL code, SPL function, SPL native function, C++ primitive operator, and Java primitive operator. To refer to the working examples about calling the dps APIs from SPL native functions and C++/Java primitive operators, you can look inside the 80 beginner examples (see Resources). Behind the scenes, such dps native functions will call into a large swath of C++ code that implements the logic required for the dps operations. Major parts of the dps functions nicely weave through the data serialization/deserialization logic and the wrapper logic for the back-end in-memory store APIs. A detailed description about the various dps functions follows:

  • uint64 dpsCreateStore(rstring name, T1 dummyKey, T2 dummyValue, mutable uint64 err)— When you want to create a new store, you can use this API. You have to pass the store name as the first method argument, then a mutable uint64 variable as the last argument. In between them, you must pass a dummy key and a dummy value to register the SPL types that will make up the keys and values for this store. For example, to create a store to hold int64-based keys and rstring-based values, you have to create a dummy int64 variable, a dummy rstring variable, then pass them to this API. As a good practice, a store should have uniform key and value SPL data types, rather than having a mixed bag of varying key and value data types. Such a discipline is critical for the other store operations explained below. When a store is successfully created, this API will return a nonzero uint64 value as a store handle. You have to use that handle for any further operation on this new store. If this API fails to create a new store, it will return a handle with a value of 0. When there is an error while creating the store, this API will indicate the error code via the err method argument.
  • uint64 dpsCreateOrGetStore(rstring name, T1 dummyKey, T2 dummyValue, mutable uint64 err)— This API is the same as the above with an additional feature. Unlike the previous API, if a store with a given name already exists, this API will not return an error. Instead, it will return the handle of that existing store. However, this API will return an error when the creation of a store fails. In general, this API is preferred over the previous API. Use this API most of the time.
  • uint64 dpsFindStore(rstring name, mutable uint64 err)— This API can be used to find a store by its name. If a store with a given name exists, it will return the handle for that store. Otherwise, it will return a value of 0 and the err method argument will be set to a non-zero error code.
  • boolean dpsRemoveStore(uint64 store, mutable uint64 err)— This API will remove the entire store using the given store handle. Any existing data items present in the store will be removed and all the metadata will be deleted. No trace will be left behind to prove that such a store existed. It will return true on a successful removal of the store. It will return false if any error occurs while removing the store, and a non-zero error code set on the err method argument will convey the reason for the failure of this API. A well-behaving application should always take the responsibility to remove the stores it no longer needs. Such a discipline helps in keeping the back-end in-memory store to be in a clean state with no orphaned data items.
  • boolean dpsPut(uint64 store, T1 key, T2 item, mutable uint64 err)— This API will create a new data item or update an existing data item in a store. You have to pass the store handle, a key for the data item (a string or any valid SPL data type), and an actual value for that data item. It will return true on the successful creation or update of the data item in the store. Otherwise, it will return false and a failure reason code will be set on the err method argument. Please note that you can pass any valid SPL type as a key and any valid SPL type as the data item value. As an example, your key could be a SPL list type and your data item value could be a nested tuple.
  • boolean dpsGet(uint64 store, T1 key, mutable T2 item, mutable uint64 err)— This API will obtain the value of the data item for a given data item key (a string or any valid SPL data type) in a given store identified by its store handle. On a successful fetch from the in-memory store, it will assign the data item value to the caller specified mutable variable (T2) and return true. In case of any error, it will return false and set a failure reason code to the err method argument. This API allows you to pass a key name of any SPL type. Similarly, you can receive the data item value of any valid SPL type.
  • boolean dpsRemove(uint64 store, T1 key, mutable uint64 err)— You can use this API to remove a data item from a store. This is done by passing the store handle and the key for the data item to be removed. A key can be made of any valid SPL type. This API will remove the data item belonging to the key provided by the caller and return true if this operation was successful. Otherwise, it will assign a failure reason code to the err method argument and return false.
  • boolean dpsHas(uint64 store, T1 key, mutable uint64 err)— This API allows you to find out if a given data item exists in a store. The caller should pass the store handle and the key for the data item. It will return true if the data item is present in the store, or it will return false. In case of any error during this operation, err method argument will be set to a non-zero failure reason code.
  • void dpsClear(uint64 store, mutable uint64 err)— This API can be used to clear the entire contents of a given store ID and, as a result, leave that store completely empty. It has no return value. In case of any errors during this operation, err method argument will be set to a non-zero failure reason code.
  • uint64 dpsSize(uint64 store, mutable uint64 err)— To get the total number of data items present in a store, this API can be called with a store handle. It will return the size of that store. In case of any error during this operation, a non-zero failure reason code will be assigned to the err method argument.

    The next three APIs are meant to be used for iterating over a store. This feature allows you to sweep through the entire store and get every data item in that store. Every data item will be fetched as a key-value pair. Please note that once a store is created it can contain a mixed group of data items with any type of key and any type of value. However, for the iteration to work correctly, it is expected that you have a store that contains uniformly typed keys and values — in other words: that all the data items in that store must have the same type of keys and the same type of values.

  • uint64 dpsBeginIteration(uint64 store, mutable uint64 err)— This API prepares a store for iteration. A call to this API must be made first for a store iteration to work correctly. A caller should pass a valid store handle. If this operation is successful, it sets the err method argument to a value of 0, then it returns a uint64 iteration handle. The caller should hold on to this iteration handle and use it during the subsequent store iteration operations. In case of any error, err method argument will carry a non-zero failure reason code.
  • boolean dpsGetNext(uint64 store, uint64 iterator, mutable T1 key, mutable T2 value, mutable uint64 err)— This API can only be called after a successful call to the dpsBeginIteration API. Here, a caller should pass a valid store handle, a valid store iteration handle, a mutable key variable, and a mutable value variable. On a successful operation, it will return true with the next available key and value stored in the two arguments passed for this purpose. If no more data items are found in the store, it will return false. In case of any error, it will set a non-zero failure reason code in the err method argument.
  • void dpsEndIteration(uint64 store, uint64 iterator, mutable uint64 err)— This API must be called to end an iteration of a store. Typically, it must be called when the dpsGetNext API returns false, indicating that there are no more data items in the store. This API cleans up the resources created for performing the store iteration. In case of any error during this operation, it sets a non-zero failure reason code in the err method argument.
  • void dpsSerialize(uint64 store, mutable blob data, T1 dummyKey, T2 dummyValue, mutable uint64 err)— This API lets a caller serialize the entire store contents into a blob variable. This API should be called with a valid store handle, a mutable blob variable, a dummy key variable with the same data type as used in the store, and a dummy value variable with the same data type as used in the store. When this is successful, it returns the serialized store data via the second (blob) method argument. If there is an error during this operation, it assigns a non-zero failure reason code in the err method argument.
  • void dpsDeserialize(uint64 store, blob data, T1 dummyKey, T2 dummyValue, mutable uint64 err)— This API is a complementary function to the previous API we looked at. It lets the caller populate a store using the serialized store data in binary format. It is useful when you want to take a complete copy of one store into another. This API must be called with a valid store handle, blob data containing the serialized content of a store, a dummy key variable with the same data type as it was used in the original store, and a dummy value variable with the same data type as it was used in the original store. In case of any error during this operation, it will assign a non-zero failure reason code in the err method argument. When this API completes successfully, a given target store will be fully populated with contents present in the previously serialized store.
  • uint64 dpsGetLastStoreErrorCode()— This API will return the error code for the most recently executed dps function. Developers are advised to check for completion codes after calling a dps API.
  • rstring dpsGetLastStoreErrorString()— This API will return an error message (if there are errors) for the most recently performed dps function. If the previous API explained above (last error code) returns a non-zero value, developers can call this API to get the error message for the most recently executed dps function.
  • rstring dpsGetStoreName(uint64 store)— This API will return the actual name of a store belonging to the given store handle. It is a convenience function in case an operator shares a store ID with another operator, and the receiving operator wants to obtain the name of the store using that store ID.
  • rstring dpsGetSplTypeNameForKey(uint64 store)— As we saw in the dpsCreateOrGetStore API, it is necessary to pass a dummy key and a dummy value to that API for registering the key and value SPL data types handled by that store. At a later time, if there is a need by any other operator to get the SPL type of the key for a given store handle, this API can be used.
  • rstring dpsGetSplTypeNameForValue(uint64 store)— As we saw in the dpsCreateOrGetStore API, it is necessary to pass a dummy key and a dummy value to that API for registering the key and value SPL data types handled by that store. At a later time, if there is a need by any other operator to get the SPL type of the value for a given store handle, this API can be used.

Library API: Distributed Lock (dl) functions

All the dps functions explained in the previous section can be called concurrently from multiple PEs and multiple threads. All store operations are atomic at the granularity of every single dps API call explained in the previous section. However, that level of atomicity is insufficient in the context of a distributed InfoSphere Streams application when you want to perform a group of store operations atomically and concurrently from multiple PEs. For instance, a developer wants to create a store, create a dozen data items in that store, perform some application logic, and based on the results from executing that business logic, update or delete a few more data items. Such a sequence of transactional activities will require much more than the locking being done inside the dps functions such as dpsPut, dpsRemove, etc. A developer should be able to lock the entire store and safely perform atomic operations on that store, then exit the store by unlocking it. This will require a distributed locking facility. There are useful APIs available to do exactly that as described below:

  • uint64 dlCreateOrGetLock(rstring name, mutable uint64 err)— This API allows you to create a lock that can be identified by a name. A caller should pass a name of the lock as a method argument. Such distributed lock names should be agreed upon by the application designers and developers so that different parts of the application would use the same lock names. On success, it will return a non-zero lock handle. If there is any error during this operation, it will set a non-zero failure reason code in the err method argument.
  • boolean dlRemoveLock(uint64 lock, mutable uint64 err)— This API can be called to remove an existing lock. A caller should pass a valid lock handle. On success, it will return true. If an error occurs during this operation, it will return false and set a non-zero failure reason code in the err method argument.
  • void dlAcquireLock(uint64 lock, mutable uint64 err)— When a you want to perform a sequence of store operations in an atomic fashion, this API must be called to acquire a well-known store lock handle. When this API succeeds, it will assign a value of zero to the err method argument. Otherwise, it will set a non-zero failure reason code in the err method argument. You must ensure that the requested lock was acquired successfully before proceeding to execute a sequence of dps APIs. It is important to note that this particular API acquires the lock for an indefinite period. What this means is that other parts of the application will not be able to acquire the same lock until you release the lock. If your code inadvertently crashes after acquiring a store lock for an indefinite period, that store cannot be accessed by anyone else. Hence, apply caution when you use this particular API. The next API is a preferred way to avoid the situation explained here.
  • void dlAcquireLock(uint64 lock, float64 leaseTime, mutable uint64 err)— This API is a preferable method over the previous API. This API does the same thing as the previous one, except it has an additional method argument called leaseTime. A caller must provide a lease time for the lock. That time indicates how long the lock should be active for the current caller. If the caller forgets to release the lock after performing a sequence of store operations atomically, that lock will be automatically released after the lease time expires. You are encouraged to use this API as much as possible instead of the previous API.
  • void dlReleaseLock(uint64 lock, mutable uint64 err)— This API must be called after a sequence of store operations is completed atomically. The caller must pass a valid lock handle to this API.
  • uint64 dlGetLastDistributedLockErrorCode()— This API returns the failure reason code for the most recently performed distributed lock operation.
  • rstring dlGetLastDistributedLockErrorString()— This API returns the error message (if there are errors) with more details for the most recently performed distributed lock function. If the previous API explained above (get last lock error code) returns a non-zero value, you can call this API to get the specific error message.

Now that we have understood the logical view layer, let's go ahead and finish the topmost InfoSphere Streams applications layer. This layer is all about putting the dps APIs to use directly inside the SPL code (or the SPL function, the SPL native function, or a combination) and directly inside the C++/Java primitive operator code or in both places.

Examples: Code snippets that showcase the dps APIs

Listing 1. Create a new store
mutable uint64 err = 0ul;
mutable uint64 s = 0ul, os=0ul;
int32 dummyInt32 = 0;
rstring dummyRstring = "";
os = dpsCreateStore("myDBStore", dummyInt32, dummyRstring, err);
assert(os!=0ul && err==0ul);
s = dpsCreateStore("myDBStore", dummyInt32, dummyRstring, err);
assert(s==0ul && err!=0ul);
s = dpsCreateOrGetStore("myDBStore", dummyInt32, dummyRstring, err);
assert(s==os && err==0ul);

The code snippet in Listing 1 uses two dps APIs. The first one is dpsCreateStore, which attempts to create a store named myDBStore. After that API call returns, there is an assert statement that ensures that we got back a non-zero store handle and the err argument doesn't have a non-zero error code. Please note that this API will return an error if the store already exists. The second API in that code snippet is dpsCreateOrGetStore. It tries to create a new store if it doesn't exist or if a store with the given name exists, it returns a store handle for that already active store. It is important to note that store names can contain space characters in them — "My Mega Store" for example. In addition, make a note of the two dummy variables being passed to the dpsCreateStore and dpsCreateOrGetStore APIs; they are required to register the SPL data types for the key and value of the store being created.

Listing 2. Find a store
mutable uint64 err = 0ul;
mutable uint64 s = 0ul;
s = dpsFindStore("myDBStore", err);
                    
if (err != 0ul) {
   printStringLn(
      "Expected error in finding an already removed store named myDBStore: rc = "
      + (rstring)dpsGetLastStoreErrorCode() + 
      ", msg = " + dpsGetLastStoreErrorString());
}

The code snippet in Listing 2 shows how to use dpsFindStore to search for an existing store by its name. If a store named myDBStore already exists, this API returns the store handle of that already-active store. If any error occurs in this API, err method argument will be set to a non-zero reason code. It is a good practice to check the err variable after every dps API call. The code snippet above also shows how to retrieve the error code and the error message when a dps API encounters an error.

Listing 3. Put and Get data items into and from a store
mutable uint64 err = 0ul;
mutable boolean res = false;
mutable uint64 s = 0ul;

res = dpsPut(s, "abc", 10, err);
assert(res==true && err==0ul);                    
mutable int32 v = 0;
res = dpsGet(s, "abc", v, err);
assert(res==true && err==0ul); // get has succeeded 
assert(v==10);

// We can use any type as value
res = dpsPut(s, "abc", {fun=true, joy=[1,2,3]}, err); // replace '10'
assert(res==true && err==0ul); // value was replaced, rather than inserted
mutable tuple<boolean fun, list<int32> joy> funJoy = {};
res = dpsGet(s, "abc", funJoy, err);
assert(res==true && err==0ul); // get has succeeded
assert(funJoy=={fun=true,joy=[1,2,3]});

// We can use any type as key
res = dpsPut(s, {3, 5, 6}, 10, err);
assert(res==true && err==0ul); // value was inserted
res = dpsPut(s, {3, 5, 6}, 11, err);
assert(res==true && err==0ul); // value was replaced
res = dpsGet(s, {3, 5, 6}, v, err);
assert(res==true && err==0ul); // get has succeeded 
assert(v==11); // prints '11'

The code snippet in Listing 3 shows how to use the dpsPut and dpsGet APIs. This example adds a new data item to the store and also modifies the value of an existing data item. On a successful put operation, it returns true. Otherwise, it returns false with the appropriate failure reason code set in the err variable. There are assert statements to verify the expected results. dpsPut can take any SPL type as a key and any SPL type as a value. In this example, we use tuples and lists as values. In addition, we also show how to use an SPL set type as a key. Finally, the use of dpsGet in this code snippet demonstrates how to retrieve data items from a store. In addition, when a data item key is of type string, then it can contain space characters — "My Favorite Movie" for example.

Listing 4. Remove a data item, get store size, clear a store, and remove a store
mutable uint64 err = 0ul;
mutable boolean res = false;
mutable uint64 s = 0ul;

dpsPut(s, "abc", 75);
// We can check existence of a data item, also remove a data item
mutable boolean exists = dpsHas(s, "abc", err);
assert(exists && err==0ul);
res = dpsRemove(s, "abc", err);
assert(res==true && err==0ul); // remove has succeeded 
exists = dpsHas(s, "abc", err);
assert(!exists && err==0ul);
res = dpsRemove(s, "abc", err);
assert(res==false && err!=0ul);

// There is a storewide clearance event going on. Let us empty the entire store.
dpsClear(s, err);
assert(err==0ul);

dpsPut(s, "Fortran", "John W. Backus", err);
dpsPut(s, "C", "Dennis MacAlistair Ritchie", err);
dpsPut(s, "C++", "Bjarne Stroustrup", err);
dpsPut(s, "Java", "James Arthur Gosling", err);
dpsPut(s, "Perl", "Larry Wall", err);
dpsPut(s, "PHP", "Rasmus Lerdorf", err);
dpsPut(s, "Python", "Guido van Rossum ", err);
dpsPut(s, "Ruby", "Yukihiro Matsumoto", err);
dpsPut(s, "SPL", "Martin Hirzel, Bugra Gedik", err);

mutable uint64 size = dpsSize(s, err);
assert(size==9ul && err==0ul);

// Remove a store
dpsRemoveStore(s, err); // All gone with no trace left behind.

The code snippet in Listing 4 shows several store functions such as dpsSize, dpsHas, dpsRemove, and dpsRemoveStore.

Listing 5. Iterate over a store
mutable uint64 err = 0ul;
mutable boolean res = false;
mutable uint64 s = 0ul;

dpsPut(s, "a", 1, err);
dpsPut(s, "b", 2, err);
dpsPut(s, "c", 3, err);
dpsPut(s, "d", 4, err);
size = dpsSize(s, err);
assert(size==4ul && err==0ul);

uint64 it = dpsBeginIteration(s, err); 
mutable rstring key = "";
mutable int32 value = 0;

while(dpsGetNext(s, it, key, value, err)) {
   printStringLn("'" + key+"' => " + (rstring)value);
}

dpsEndIteration(s, it, err);
 
dpsRemoveStore(s, err);
assert(err==0ul);

The code snippet in Listing 5 shows how one can iterate over a store. A distributed store is not meaningful unless there is a way for users to know what data items are held in it. That need is satisfied by this store iteration API. Store iteration activity comprises three dps APIs. You have to first get an iteration handle for a given store using dpsBeginIteration, then stay in a loop to call dpsGetNext until it returns false, and finally call dpsEndIteration to do the cleanup of iteration resources. Whenever we talk about the store iteration, ordering of the data items takes the center stage. In our dps implementation, we can guarantee that data items retrieved during a store iteration will be in the exact order in which they were originally inserted. However, there is an exception to this statement. We can guarantee the correct ordering of the data items during store iteration only when you use memcached as your back-end in-memory store. Due to implementation specifics, we can't give the same guarantee when you use Redis as your back-end in-memory store. Having said that, one can create the store keys with some kind of sequence number tagging, then retrieve all the data items via store iteration, and finally sort the retrieved data item keys to get back the original ordering. On a different topic, store iteration makes sense only if a store contains uniform types of keys and values. If a store has a mixed bag of data items, store iteration will return only partial store contents or none at all.

Listing 6. Serialize and deserialize store contents
mutable uint64 err = 0ul;
mutable boolean res = false;
mutable uint64 s = 0ul;

rstring dummyRstring = "";
list<rstring> dummyRstringList = [""];
s = dpsCreateOrGetStore("Zip_Code_Lookup_ABC", 
dummyRstring, dummyRstringList, err);
// Let us add a few entries into our store.
dpsPut(s, "10514", ["Chappaqua", "New York"], err);
dpsPut(s, "10598", ["Yorktown Heights", "New York"], err);
dpsPut(s, "10801", ["New Rochelle", "New York"], err);
dpsPut(s, "10541", ["Mahopac", "New York"], err);
dpsPut(s, "10562", ["Ossining", "New York"], err);
dpsPut(s, "10549", ["Mount Kisco", "New York"], err);
dpsPut(s, "10506", ["White Plains", "New York"], err);
dpsPut(s, "10526", ["Goldens Bridge", "New York"], err);
dpsPut(s, "11577", ["Roslyn Heights", "New York"], err);
dpsPut(s, "10532", ["Hawthorne", "New York"], err);

// We can serialize this entire store into a blob in just one native function call.
// You have to pass a dummy key and a dummy value to indicate the 
// types used for the keys and values in your store. 
mutable rstring key = "";
mutable list<rstring> value = [];
dpsSerialize(s, sData, key, value, err);

// Since we serialized the entire store into a blob, we can get rid of this store.
dpsRemoveStore(s, err);

// We are going to create a new store and use the blob data
// to deserialize it for populating the new store's contents.
s = dpsCreateOrGetStore("Zip_Code_Lookup_XYZ", dummyRstring, dummyRstringList, err);

// Use a dummy key and a dummy value as type indicators.
// From the blob data we made above, populate the new store we created just now.
dpsDeserialize(s, sData, key, value, err);

// We populated the entire store through deserialization of the blob data.
// Let us iterate through this new store and see its contents.
printStringLn("Contents of a store named 'Zip_Code_Lookup_XYZ' " +
   "that was populated via dpsDeserialize:");
uint64 it = dpsBeginIteration(s, err); 
                 
while (dpsGetNext(s, it, key, value, err)) {
   printStringLn("'" + (rstring)key + "' => " + (rstring)value);
}
                 
dpsEndIteration(s, it, err);
// Get rid of this store.
dpsRemoveStore(s, err);

The code snippet in Listing 6 shows another important dps feature about serializing and deserializing stores. This API helps us create a binary object of an entire store. Once that blob is created, it can be circulated to any other PE or other InfoSphere Streams applications. When the recipient has a need to use the serialized data, an entirely new store can be created and populated using the serialized store object. This powerful dps feature is at your disposal when an application needs such a thing. The example shown above deals with how a new store is created, populated with contents, serialized, and deleted. After that, a completely new store is created and populated by deserializing the blob data containing the serialized contents from another store. Then, we iterated over this second store and displayed its contents to prove that both dpsSerialize and dpsDeserialize complement each other well.

Listing 7. Bulk store activity guarded by a distributed lock
mutable uint64 err = 0ul;
mutable boolean res = false;
mutable uint64 s = 0ul;
mutable uint64 l = 0;

// Create a new store.
rstring dummyRstring = "";
// While creating a store, pass two dummy variables to match the
// SPL data types for the key and value of this store.
s = dpsCreateOrGetStore("Super_Duper_Store", dummyRstring, dummyRstring, err);
// Create a user defined distributed lock.
l = dlCreateOrGetLock("Super_Duper_Lock", err);

// Get a distributed lock with a lease time for 30.0 seconds.
dlAcquireLock(l, 30.0, err);

// Do a bulk store activity.
mutable int32 cnt = 0;

while(cnt++ < 1000000) {
   dpsPut(s, "myKey" + (rstring)cnt, (rstring)cnt, err);
}


// Release the lock.
dlReleaseLock(l, err);

// Remove the lock.
dlRemoveLock(l, err);

The code snippet in Listing 7 shows how a sequence of bulk activities can be performed on a store by gaining exclusive access into the store. It employs the distributed locking APIs that are part of the dps toolkit.

As mentioned, dps APIs are universally callable from all parts of an InfoSphere Streams application. In particular, the dps API usage patterns explained above will work only inside the SPL code segments, SPL functions, SPL native functions, and C++ primitive operators. If you are a Java primitive operator developer, don't despair. You are in for a treat with the Java dps APIs because you will not use the dps APIs as direct function calls as described above. For Java primitive operator developers, a nice object-oriented (OO) layer awaits. In a nutshell, Java dps APIs are simplified by letting you treat stores and distributed locks as objects on which you invoke different methods. In the case of Java dps APIs, you need not pass the store handle and an error argument in every method call. The code snippet in Listing 8 shows how the dps APIs can be used inside a Java primitive operator.

Listing 8. A sneak peek into the Java dps APIs
import com.ibm.streamsx.dps.*;

private StoreFactory sf = DistributedStores.getStoreFactory();
Store testStore1 = null;
// Create a store
try {
   testStore1 = sf.createOrGetStore("A_Quick_Store", "ustring", "ustring");
   System.out.println("Successfully created a new store named 'A_Quick_Store' with a store id " + 
      testStore1.getId() + ".");
} catch (StoreFactoryException sfe) {
   System.out.println("Unable to create a new store named 'A_Quick_Store': Error code = " +
      sfe.getErrorCode() + ", Error msg = " + sfe.getErrorMessage());
   throw sfe;
}

// Add data items into a store.
testStore1.put("1914–1956", "Thomas J. Watson");
testStore1.put("1956–1971", "Thomas J. Watson, Jr");
testStore1.put("1971–1973", "T. Vincent Learson");
testStore1.put("1973–1981", "Frank T. Cary");
testStore1.put("1981–1985", "John Opel");
testStore1.put("1985–1993", "John F. Akers");
testStore1.put("1993–2002", "Louis V. Gerstner, Jr.");
testStore1.put("2002–2011", "Samuel J. Palmisano");
testStore1.put("2012-XXXX", "Virginia M. Rometty");

// Now, iterate the store contents using a very elegant for loop.
System.out.println("This is a prestigious CEO lineup of IBM in its illustrious history:");
for (KeyValuePair kv: testStore1) {
   System.out.println("'" + kv.getKey() + "' => '" + kv.getValue() + "'");
}

// Remove the store.
sf.removeStore(testStore1);

// Store serialization.
Store topBrandsStore = sf.createOrGetStore("2013_Best_Global_Brands_ABC", "int32", "ustring");
// Add few data items.
topBrandsStore.put(1, "Apple");
topBrandsStore.put(2, "Google");
topBrandsStore.put(3, "Coca Cola");
topBrandsStore.put(4, "IBM");
topBrandsStore.put(5, "Microsoft");
topBrandsStore.put(6, "GE");
topBrandsStore.put(7, "McDonald's");
topBrandsStore.put(8, "Samsung");
topBrandsStore.put(9, "Intel");
topBrandsStore.put(10, "Toyota");
ByteBuffer serializedStore = topBrandsStore.serialize();

// Store deserialization
sf.removeStore(topBrandsStore);
topBrandsStore = sf.createOrGetStore("2013_Best_Global_Brands_XYZ", "int32", "ustring");
topBrandsStore.deserialize(serializedStore);
System.out.println("This is a list of top 10 rankings for the best global brands in 2013:");
for (KeyValuePair kv: topBrandsStore) {
   System.out.println("'" + kv.getKey() + "' => '" + kv.getValue() + "'");
}

sf.removeStore(topBrandsStore);

For a more comprehensive explanation about using the dps APIs inside your SPL code segments, C++ primitive operators, native functions, and Java primitive operators, please refer to the following examples in the 80 beginner examples URL in Resources:

  • 058_data_sharing_between_non_fused_spl_custom_and_cpp_primitive_operators
  • 061_data_sharing_between_non_fused_spl_custom_operators_and_a_native_function
  • 062_data_sharing_between_non_fused_spl_custom_and_java_primitive_operators

Layout of the dps toolkit

The dps toolkit can be downloaded from the InfoSphere Streams Exchange website (see Resources). It is named com.ibm.streamsx.dps, and it contains the complete C++, Java source code, and the pre-built dps libraries for four flavors of Linux, including RHEL5, CentOS5, RHEL6, and CentOS6. When you extract the downloaded dps tar.gz file, your newly extracted directory will have the following layout.

com.ibm.streamsx.dps                 (Top-level toolkit directory)

a) doc
      --> dps-usage-tips.txt         (Useful tips about the dps toolkit)
                                      
b) com.ibm.streamsx/lock/dist*       (Distributed Locks)
      --> native.function            (SPL Native Function directory)
      --> function.xml               (SPL Native Function Model file)
                                               
c) com.ibm.streamsx/store/dist*      (Distributed Store)
      --> native.function            (SPL Native Function directory)
         --> function.xml            (SPL Native Function Model file)
         
d) impl/include                      (Directory where the include files for the dps will be present)
      --> DistributedProcessStore.h  (C++ class interface file)
      --> Distributed*Wrappers.h     (C++ include file that contains the Streams native function
                                     entry point code)
      --> DBLayer.h                  (C++ abstract interface file for the Data Store layer)
      --> PersistenceError.h         (C++ interface to represent the db layer return code and 
                                     error messages)
      --> MemcachedDBLayer.h         (C++ interface for Memcached in-memory store access
                                     logic)
      --> RedisDBLayer.h             (C++ interface for Redis in-memory store access logic)
      --> DpsConstants.h             (C++ include file with constants used to interface with an 
                                     in-memory store)
      --> com*_DpsHelper.h           (Automatically generated JNI C++ include file for use by
                                     the Java primitive operators)

e) impl/src                           (Directory where the CPP files for the dps will be present)
      --> DistributedProcessStore.cpp (C++ class implementation file for the dps serialization
                                      /deserialization layer)
      --> MemcachedDBLayer.cpp        (C++ class implementation for the Memcached 
                                      in-memory store access logic)
      --> RedisDBLayer.cpp            (C++ class implementation for the Redis in-memory 
                                      store access logic)
      --> com*_DpsHelper.cpp          (JNI C++ bridge for Java code to call directly into the
                                      common C++ dps implementation)

f) impl/java/src
      --> com.ibm.*/DpsHelper.java   (Java DpsHelper class needed to access dps APIs
                                     from Java primitive operators)
      --> build_dps_helper.sh        (A script to build the DpsHelper class)
      
g) impl/java/bin
       --> dps-helper.jar             (JAR file for the dps-helper that should be added to a 
                                     Java primitive operator's Java build path)

h) impl/mk                           (A shell script to build the shared object library of the C++ code 
                                     shown above) 

i) impl/lib                          (Directory into which the .so library built above will be copied)
                                     (Inside of this, we ship the in-memory store client libraries.)
j) samples/dps_test_1 
      --> DpsTest1.splmm             (Simple test application SPL file that invokes C++ native 
                                     functions to show the various dps features.)
      --> etc                        (SPL test application's data directory)
         --> in-memory*.txt          (Configuration file containing memcached or Redis server 
                                     names)
      --> bin                                        
         --> build-distributed.sh    (Script to build a Distributed Streams executable)
         --> run-distributed.sh      (Script that will run the distributed executable of this 
                                     application)
         --> stop-streams-app.sh     (Script that will stop all the running Streams jobs)
         --> stop-str*-instance.sh   (Script that will stop a specified Streams instance)

As explained above, this toolkit consists of C++ code for generating the dps shared object (.so) file, Java code for creating the dps-helper.jar file, and SPL code for an InfoSphere Streams test driver application.

Inside this toolkit, impl/src and impl/include directories contain the source code for the dps native function logic. Primarily, it provides the serialization/deserialization code, and the memcached and Redis interface layer code. A Wrapper include (.h) file is an important one, providing an entry point for the SPL application to directly call a C++ class method. All the C++ logic will be compiled into a shared object library (.so) file and will be available to any outside SPL application that will have a dependency on this toolkit. In the impl/java/src directory, you will find all the Java code required for the Java dps APIs and in the impl/java/bin directory, you will see the dps-helper.jar file built from the source code for the Java dps APIs.

This toolkit also contains a simple test SPL flow graph constructed to make a call chain to exercise different dps functions. Inside the dps toolkit directory, a native function model XML file outlines the meta information needed to directly call a C++ class method from SPL. This detail includes the C++ wrapper include file name, C++ namespace containing the wrapper functions, C++ wrapper function prototype expressed using SPL syntax and types, name of the shared object library created within this toolkit, location of the shared object library, location of the wrapper include file and so on. SPL test application code in this toolkit demonstrates how two different PEs can share data between themselves using the dps APIs. In addition, there is code to do 10 parallel writers and 10 parallel readers to write and read data to and from the chosen back-end in-memory store. Please refer to the top of the SPLMM and the C++ source files for additional commentary.

Verifying and using the dps toolkit

In order to run and use the dps toolkit, it is important to have the in-memory store client software available on all the machines where the InfoSphere Streams applications will be running. Does that mean we have to now go and install our chosen back-end in-memory store client software on several dozen machines? No; we made that job easy for you. There is no need to install any of the memcached or Redis client software on one or more of your InfoSphere Streams application machines. As part of this toolkit, we already packaged the client libraries (.so files) for both memcached and Redis. Our dps implementation uses the popular libmemcached C client for accessing the memcached in-memory store. It uses the hiredis C client for accessing the Redis in-memory store. Library files for both are stored in the impl/lib subdirectory inside the dps toolkit. At this time, we support the dps facility only on x86 servers installed with one of these Linux flavors: RHEL5, CentOS5, RHEL6, CentOS6.

Following are the steps required to build and run the test application to showcase all the features of dps. If you choose, the dps core C++ code artifact can be built into an .so file before attempting to verify this toolkit. But there is no need for that since we have already pre-built and packaged it for you inside this toolkit's impl/lib directory. Unless you really want to build the dps library on your own, you can skip steps 1 and 2 below.

  1. Switch to your_base_dir/com.ibm.streamsx.dps/impl directory and run ./mk.
  2. If ./mk worked correctly, it should have built and copied the .so file to the following location (if you have a RHEL6 machine): your_base_dir/com.ibm.streamsx.dps/impl/lib/x86_64.RHEL6.
  3. Switch to your_base_dir/com.ibm.streamsx.dps/samples/dps_test1/bin directory and run this script: ./build-distributed.sh.
  4. You have to edit the your_base_dir/com.ibm.streamsx.dps/samples/dps_test1/etc/data/in-memory-store-servers.cfg file and add the names or the IP addresses of your memcached or Redis servers, then save the file.
  5. To test the application, you can run this script from your_base_dir/com.ibm.streamsx.dps/samples/dps_test1/bin:
    ./run-distributed.sh -i YOUR_STREAMS_INSTANCE_NAME
  6. You can verify the application results in the InfoSphere Streams PE log files. You have to first read the SPLMM file and understand which operator writes what in the PE log files.
  7. After verifying the results produced by this test application, you can run this script from the bin directory: ./stop-streams-instance.sh -i YOUR_STREAMS_INSTANCE_NAME.
  8. You can use the code shown in the SPLMM file as a reference and take advantage of the dps functions in your own applications to share data across multiple PEs running on multiple machines. All you have to do is simply compile your application with -t path_containing_the_dps_toolkit_directory and add the following two statements at the top of your SPL or SPLMM files:
    use com.ibm.streamsx.store.distributed::*;
    use com.ibm.streamsx.lock.distributed::*;

Hey! What about performance and persistence in the dps?

We didn't ignore or forget them. In fact, we paid close attention to them during our dps implementation. All three technologies that make up the dps (InfoSphere Streams, memcached, and Redis) are known for their superior performance characteristics. In our lab tests, we did 1 million parallel writes and 1 million parallel reads in a multi-threaded fashion using 10 writers and 10 readers. We observed the average insertion and read rate to be in microseconds. To try this performance run for yourself, use the commented block of code in the test application packaged with the dps toolkit.

Let us be frank: memcached is a pure data cache and it doesn't provide any persistence on its own. But Redis will come to our rescue. It has some cool features to achieve data persistence. Out of the box, Redis provides a periodic snapshot feature that writes the in-memory store contents to a disk in the background. If that is not enough, its Append Only File (AOF) feature will journal every change made to the store contents. In case of a machine crash, it will come back with the same store contents on its restart as if nothing happened. During that failure, Redis also superbly manages fail-over to one or more slaves for high availability. Therefore, we are protected very well here with the InfoSphere Streams and Redis combo.

A cursory glance at a few dps application patterns

At the time of publishing this article, the ability to externalize the application state and make it sharable across many distributed applications is very new to InfoSphere Streams. Availability of such a feature will open up many possibilities for InfoSphere Streams application designers, architects, and developers. Even though it is not possible at this time to fully envision all the neat things one can do with the dps, here's an initial list of random ideas that the InfoSphere Streams developer community can try to validate when they have spare time and energy:

  • In a large distributed application, one wants to deliver a bunch of application-specific configuration changes to many PEs distributed in a big cluster of machines. This could potentially be done using the dps.
  • In a network router analysis streaming application, data received from a router will have a change of protocol template information. A network packet parser PE should detect the template change and share that new template with several dozens of downstream analytic PEs. How about sticking that new template in a store and let the downstream PEs read from there?
  • In a real-time video analysis streaming application, heuristic models are important for the analytic engines. When a new model is ready for a specific face or contour detection technique, how can we deliver that to all the video analytic scoring engines without restarting them? A partial answer could be to use the dps.
  • Take the case of an application that does a behavioral pattern analysis as customers routinely visit and leave an online commerce website. This application monitors in real time which pages a customer visits and clicks often. It gathers some idea about a customer's interest in a set of products. To process this in real time, use the pool of click-stream analytic engines running inside InfoSphere Streams. A customer landing on a web page is always assigned to a dedicated engine such that the dedicated engine can maintain that customer's state information locally. How about moving the customer state information to the dps and making the engines stateless? This approach will enable any available engine to process any customer by loading and saving the state information much faster from the dps, rather than a slow relational database.
  • In stock market analysis applications, hundreds of portfolios are continuously calculated by different pricing engines in real time. Such state is either maintained inside the application's memory or in a database. If the dps can be worked into that architecture, now portfolios can be shared by several engines, rather than a single engine maintaining and delivering it to the interested parties.
  • If a PE maintains all its application state information in the dps, when it fails and gets automatically restarted by InfoSphere Streams, can it simply continue to process the requests since its previously held state is still intact in a distributed store? Could this idea be extended a bit further to have dps play some role in providing fault tolerance for the InfoSphere Streams operators?
  • Distributed locking is included in the dps and can be used in an InfoSphere Streams application that has a need for distributed locks.
  • Geohashing involves maintaining a huge list of geographical position coordinates. Such data is usually shared between many real-time analytic modules. Can the dps serve the geohash between many application components? Any reference data can be loaded into the dps, making it easy to access from any PE that needs it.
  • In a streaming system, new applications come and go routinely. They tap into each other to share what other applications have already gathered in terms of analytic results. Rather than using a cumbersome data exchange scheme between applications, dps could act as a reliable place to keep the data, regardless of whether the applications are currently running.
  • In telecom call detail record (CDR) processing, bloom filters are critical for detecting duplicate CDRs. They are memory-intensive and the ability to share the CDR duplication information among different CDR analytic engines would be beneficial. Can the dps be thought of as an effective place to keep the bloom filter data bitmap?

These are only a handful of ideas that can serve as good food for thought. Since the dps facility is easy to set up and use, it is now up to the InfoSphere Streams application developers to stretch the potential of this toolkit.

Conclusion

InfoSphere Streams is a market-leading event processing platform that offers superior capabilities for performing big data analytics. It packs a powerful, flexible, and extensible programming model in its Streams Processing Language (SPL). It is remarkably efficient at moving data tuples within your system memory to provide data, as well as pipeline parallelism. memcached and Redis are commercially proven and excellent high-performance distributed in-memory stores. Combining the power of InfoSphere Streams with memcached or Redis is a sweet deal for application developers to have a data sharing facility between different processing elements placed on one or more machines.

This article focuses on bringing together the best capabilities of three worlds (InfoSphere Streams, memcached, and Redis). It sums up a way to incorporate a distributed process store inside of applications. In addition to educating you about InfoSphere Streams/memcached/Redis integration, this article introduces the mechanisms involved in having a reusable data serialization/deserialization layer on top of which any scalable third-party in-memory store can be glued together.

In summary, we have described a way to share your application related state information between distributed components of an InfoSphere Streams application. This article also introduced an important design pattern for integrating a high-performance in-memory stream processing system (InfoSphere Streams) with a high-performance in-memory store (memcached or Redis). This article proves the concepts of the distributed process store (dps) by using a fully working test application available inside of the dps toolkit. You can apply the techniques shown here in your own InfoSphere Streams applications to take advantage of the dps features.

Acknowledgements

In closing, we want to convey our sincere thanks to these seasoned InfoSphere Streams experts at the IBM Thomas J. Watson Research Center for taking time either to advise us or to review the early drafts of this article: Martin Hirzel, Kun-Lung Wu, and John Morar.

We also want to mention two other gentlemen we have never met. The brainchild of each of these individuals is instrumental in organizing our dps work. We want to send our best wishes to Brad Fitzpatrick for his amazing contributions to memcached and to Salvatore Sanfilippo for his mind-boggling work with Redis.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics, Information Management
ArticleID=948987
ArticleTitle=Using InfoSphere Streams with memcached and Redis
publish-date=10222013