Developing and running applications that use the DPS Toolkit

Specialized toolkits - release 4.3.1.0-i20200220 > com.ibm.streamsx.dps 4.1.1 > Developing and running applications that use the DPS Toolkit

To create applications that use the DPS Toolkit, you'll need to:
  1. Install IBM InfoSphere Streams. Configure the product environment variables by entering the following command:
    
      source product-installation-root-directory/4.2.0.0/bin/streamsprofile.sh
    
  2. Ensure that all of the following additional RPMs required by the DPS toolkit are present on your system:
    • curl
    • curl-devel
    • lua
    • lua-devel
    • openssl-devel
    • openldap-devel
    • ibverbs
    • ibverbs-devel
  3. Install and configure an external key-value data store that is supported by DPS.
  4. Configure the DPS toolkit to connect to the data store from step 3. Copy the DPS toolkit sample configuration file from STREAMS_INSTALL/samples/com.ibm.streamsx.dps/DPSUsageFromSPL/etc/no-sql-kv-store-servers.cfg to your-project-directory/etc/no-sql-kv-store-servers.cfg and edit the file as follows:
    • For Redis in non-cluster mode: redis should be the first line in the configuration file. Then specify the Redis server/IP address and port number separated by a colon, e.g. Machine1:7002. Each additional server should be specified on a new line. If you prefer to use the Unix domain socket instead of TCP, you can simply specify unixsocket instead of a server name. If you decide to use a unix domain socket, you must also ensure that your redis.conf file on the server side is configured properly for a unix domain socket pointing to /tmp/redis.sock file.
    • For Redis v3.x in cluster mode: redis-cluster should be the first line in the file, followed by the server name or IP address and a port number for one of the master Redis nodes that is active in your Redis cluster. For example: Machine1:30001.
    • Each time you create a new SPL project, copy the configuration file from the example provided in the DPS toolkit to the /etc directory inside of your SPL project directory. Make any configuration changes needed to the file, based on your application's needs.
    • See the sample configuration file for additional examples.

Using the toolkit in SPL applications

All the functionality of the toolkit is available via the native functions in the com.ibm.streamsx.store.distributed and com.ibm.streamsx.lock.distributed namespaces. You need to include these namespaces in your SPL application via a use directive. See the "Getting Started" section below and the native function documentation has more details on how to use the toolkit within applications written in SPL. If you have a C++ or Java operator or function that uses the DPS toolkit, your SPL application graph must also include an instance of the DPSAux operator. This operator does not require any additional configuration but must be present in order for your application to work correctly. See the DPSUsageFromJava sample for an example.

Using the toolkit in Java applications

Ensure that <dps_toolkit_home>/impl/java/lib/dps-helper.jar, which contains the Java implementation of the DPS functions is accessible to your application. Packages of interest are com.ibm.streamsx.dps and com.ibm.streamsx.dl. See the "Getting Started" section below and the Javadoc for details on using the API from operators written in Java.

Note regarding fusing Java operators: Fusing two or more Java operators that utilize the Java DPS API requires that each operator have the same class path and the @SharedLoader annotation in its class definition. For example:

PrimitiveOperator(name="MyJavaOperator", namespace="com.ibm.demo",  description="Java Operator MyJavaOperator")
@SharedLoader(true)
public class MyJavaOperator extends AbstractOperator {
	...
}

Using the toolkit in C++ applications

Include the C++ header file DistributedProcessStoreWrappers.h found in impl/include. This is the main entry point for the C++ functions, which are in the C++ namespace com::ibm::streamsx::store::distributed.

Getting Started

The following snippets demonstrate the basic usage of the toolkit from SPL and Java. Usage from C++ is very similar to the SPL example below.

SPL:


rstring dummyRstring = "";
uint32 dummyUint32 = 0u;
mutable uint64 err = 0ul;
mutable uint64 dbStore_handle = 0ul;
dbStore_handle = dpsCreateStore("myDBStore1", dummyRstring, dummyUint32, err);

if (err == 0ul ) { //no error occurred
	 //create lock for the store
	mutable uint64 lock_id = dlCreateOrGetLock("My db store lock", err);
	// Acquire the newly created lock, specifying a lease time and maximum time to wait to acquire the lock.
	float64 max_wait = 10.0;
	float64 lease_time = 10.0;
	dlAcquireLock(lock_id, lease_time, max_wait, err);
	//add a key/value pair to the store
	mutable boolean result = true;
	rstring key = "IBM";
	uint32 value = 399;
	err = 0ul;
	result = dpsPut(dbStore_handle, key, value, err);
	
	if (err != 0ul) {
		//use  dpsGetLastStoreErrorCode() and  dpsGetLastStoreErrorString() as needed
	}
	// finished our store operations, release the lock
	err = 0ul;
	dlReleaseLock(lock_id, err);	
}

Java:


StoreFactory sf = DistributedStores.getStoreFactory();
Store store = null;

try {
   //specify the SPL types for the keys and values in the store
   String keyType = "rstring";
   String valueType = "int32";
   store = sf.createOrGetStore("Java Test Store1", keyType, valueType);
} catch (StoreFactoryException sfe) {
	// use	sfe.getErrorCode() and  sfe.getErrorMessage()) for more info
}

...
//once ready to access the store,
//get the lock for the store, may have previously been created
   LockFactory lf = DistributedLocks.getLockFactory(); 
   Lock myLock = lf.createOrGetLock("Lock_For_Test_Store1");

// Acquire the lock
try {
 	myLock.acquireLock();
} catch (LockException le) {
	System.out.print("Unable to acquire the lock named 'Lock_For_Test_Store1'");
	System.out.println(" Error = " + le.getErrorCode() + ", Error msg = " + le.getErrorMessage());
    throw le;
}

//perform store operations
store.put("IBM", 39);
store.put("Lenovo", 50);
//release the lock  when finished
myLock.releaseLock();

Note that error checking in the above examples is minimal.

Error Handling in C++ and SPL

Most C++ functions include a mutable err parameter that will contain the result of executing the function. If an error occurs, this variable's value will be non-zero. It is the caller's responsibility to provide a mutable parameter to contain the error code and check its value afterwards.

In addition, there are functions that can be called when an error occurs to return the last error code and a message describing the error. The correct function to call after an operation depends on the operation that was last executed.
  • For errors relating to the store functions, use dpsGetLastStoreErrorCode() and dpsGetLastStoreErrorString().
  • For errors related to the TTL functions, use dpsGetLastStoreErrorCodeTTL() and dpsGetLastStoreErrorStringTTL().
  • For locking related errors, use dlGetLastDistributedLockErrorCode() and dlGetLastDistributedLockErrorString().

The following example shows how to check for errors, after a function call, in this case, after creating a lock:


mutable uint64 err = 0ul;
mutable uint64 lock_id = dlCreateOrGetLock("My Sentinel Lock1", err);

if (err != 0ul) {
	rstring msg = dlGetLastDistributedLockErrorString();
	uint64 rc = dlGetLastDistributedLockErrorCode();
	printStringLn("Error creating lock, rc = " + (rstring)(rc) + ", msg =" + msg );
}

Additional Examples

To specifically learn how to call the DPS APIs from SPL native functions, C++ and Java primitive operators, see the samples included in <STREAMS_INSTALL>/samples/com.ibm.streamsx.dps.

Reference information

DPS Java API Reference