Developer guide for IBM Db2 Event Store client APIs

IBM Db2 Event Store is a hybrid transactional analytical processing (HTAP) system that extends the Spark SQL interface to provide support for ingesting data and accelerates queries so that you can analyze data in real-time.

This guide is written for IBM Db2 Event Store Enterprise Edition users.

This guide provides examples written for the Scala client interface and the Python client interface and covers the following tasks:

Before you begin

Ensure that you are familiar with the limitations and known issues for your IBM Db2 Event Store edition before you develop a client application:

If you need more information about the arguments that you can specify for a particular method or about the objects that are returned by a method, see the following topics:

Prepare your development environment

About this task

Complete the following steps before you run your client application:

Procedure

  1. Review the sample notebooks to determine if you can use any of the sample code in your own application.
  2. Install Spark on your local file system:
    • Enterprise Edition: Install Spark 2.2.1 or Spark 2.3.2.
  3. Set your SPARK_HOME environment variable to your Spark installation directory.
  4. Obtain the appropriate client file for your environment:
    • Enterprise Edition:
      1. Download the ibm-db2-eventstore-client-n.n.n.jar file. Based on your version of Spark, you can download the client from:
      2. Download and extract the python.zip file from Python packages for IBM Db2 Event Store.
      3. Set the PYTHONPATH environment variable to the directory where the Python packages are located:
        export PYTHONPATH="${PYTHONPATH}:Python_package_location
  5. Copy the client JAR file to the $SPARK_HOME/jars directory:
    • Enterprise Edition: Copy the ibm-db2-eventstore-client-n.n.n.jar file.
  6. Identify the connection endpoint of the Db2 Event Store instance. In your application, set the endpoint to the IP address that you obtained:
    ConfigurationReader.setConnectionEndpoints("<connection endpoint of the IBM Db2 Event Store instance>")
    Note: There may be other ConfigurationReader settings you may need, for example the SSL connection parameters. For more information on ConfigurationReader settings, refer to IBM Db2 Event Store ConfigurationReader API guide

What to do next

Want a little more help getting started? The Code samples section includes sample applications in Java, Python, and Scala. It also includes language-specific developer guides:

Inserting data into IBM Db2 Event Store

The interface for working with databases and tables and for inserting data into IBM Db2 Event Store is exposed as the EventContext instance.

Your application must complete the following high-level tasks to insert data into IBM Db2 Event Store:

Connect to the IBM Db2 Event Store Cluster

To establish a connection to the IBM Db2 Event Store cluster, the client must provide connection endpoint information (setConnectionEndpoints) by using a configuration reader (the ConfigurationReader class) which provides a set of methods that you can use to connect to and configure IBM Db2 Event Store. For more information on the connection endpoint, refer to Identifying the connection endpoint of the IBM Db2 Event Store instance.

Python
from eventstore.common import ConfigurationReader
ConfigurationReader.setConnectionEndpoints("HostIP:Port")
Scala
import com.ibm.event.common.ConfigurationReader
ConfigurationReader.setConnectionEndpoints("HostIP:Port")

For more information, see the IBM Db2 Event Store ConfigurationReader API guide.

Open an existing database

If the database already exists, you must open the database before you can use it. You can open the database in either of the following ways:
  • If you don't have the EventContext instance for the database, you must obtain it by calling the getEventContext method of the EventContext singleton class:
    Python
    with EventContext.get_event_context("EVENTDB") as ctx:
    Scala
    val eventContext = EventContext.getEventContext("EVENTDB")
  • If you already have the EventContext instance for the database, open the database by calling the openDatabase method:
    Python
    eventContext.open_database()
    Scala
    val result = eventContext.openDatabase()

Specify the table schema

An IBM Db2 Event Store table is created from a schema. When you specify the table schema (TableSchema) you specify the columns, the sharding key, and the primary key using Spark data types.

Python
from eventstore.catalog import TableSchema

schema = StructType([
        StructField("userId", LongType, nullable = false),    
        StructField("categoryId", IntegerType, nullable = false),
        StructField("productName", StringType, nullable = false),
        StructField("boolfield", BooleanType, nullable = false),
        StructField("boolfield2", BooleanType, nullable = true),
        StructField("duration", IntegerType, nullable = false ),
        StructField("review", StringType, nullable = false)
        ])
    table_name = "ReviewTable"
    table_schema = TableSchema(table_name, schema,
                   sharding_columns=["userID"],
                   pk_columns=["userID", "categoryId"])
Scala
import com.ibm.even.catalog.TableSchema

val reviewSchema = TableSchema("ReviewTable", StructType(Array(
     StructField("userId", LongType, nullable = false),
     StructField("categoryId", IntegerType, nullable = false),
     StructField("productName", StringType, nullable = false),
     StructField("boolfield", BooleanType, nullable = false),
     StructField("boolfield2", BooleanType, nullable = true),
     StructField("duration", IntegerType, nullable = false ),
     StructField("review", StringType, nullable = false))),
     shardingColumns = Seq("userId"), pkColumns = Seq("userId"))

In this example, the sharding key and the primary key are defined on the same column.

Databases in IBM Db2 Event Store are partitioned into shards. Any given IBM Db2 Event Store node (in a multi-node IBM Db2 Event Store cluster) contains 0, 1, or N shards of the defined database. In addition to the mandatory shard key, you can optionally provide a primary key. When you define a primary key, IBM Db2 Event Store ensures that only a single version of each primary key exists in the database.

Create a table

Create an IBM Db2 Event Store table using the unresolved schema that you specified in the previous step. If you did not specify a schema name, the schema name will default to your IBM Db2 username.

Python
ctx.create_table(table_schema)
Scala
EventContext.createTable(reviewSchema)
Tip: If the table that you want to access already exists, use the getNamesOfTables() method instead. For more information, see Obtain a list of the tables in the database.

Get the resolved table schema reference

You must have a reference to the resolved table schema OR use the optional schemaName argument when invoking getTable() before you can perform insert operations. A resolved schema contains additional table metadata that is maintained and used by the IBM Db2 Event Store engine. For more information on the getTable() method, see Obtain a resolved table schema.

Python
resolved_table_schema = ctx.get_table("ReviewTable")
Scala
val reviewTable = EventContext.getTable("ReviewTable")
Note: If the table that you want to access already exists, use the getTableNamesAndSchemas() method instead. For more information, see Obtain a list of tables and their schema.

Insert data asynchronously in batch

After you have the resolved table schema, you can insert data by inserting a single row or by performing a batch insert. Batch inserts are always performed asynchronously. A single row insertion can be synchronous or asynchronous.

In the following example, random data is generated by a data generator based on the provided schema and the number of rows to insert in each batch. The data is then sent to the IBM Db2 Event Store engine in batch asynchronously.

Python
from eventstore.oltp import EventContext
from eventstore.oltp.row_generator import generate_reviews
from eventstore.catalog import TableSchema, IndexSpecification, SortSpecification, ColumnOrder
from pyspark.sql.types import *

if __name__ == '__main__':
    print("opening database...")
    with EventContext.get_event_context("eventdb") as ctx:
        schema = StructType([
            StructField("userId", LongType(), nullable=True),
            StructField("time", TimestampType(), nullable=True),
            StructField("productId", IntegerType(), nullable=True),
            StructField("rating", IntegerType(), nullable=True),
            StructField("review", StringType(), nullable=True)
        ])
        table_name = "reviews"
        table_schema = TableSchema(table_name, schema,
                                   sharding_columns=["userId"],
                                   pk_columns=["userId", "time"])
        index_spec = IndexSpecification(index_name="pkindex",
                                        table_schema=table_schema,
                                        equal_columns=["userId"],
                                        sort_columns=[SortSpecification("time", ColumnOrder.ASCENDING_NULLS_LAST)],
                                        include_columns=["rating"])

print("creating table with index...\n{}".format(table_schema))
        ctx.create_table_with_index(table_schema, index_spec)
        print("list of table names:")
        table_names = ctx.get_names_of_tables()
        for idx, name in enumerate(table_names):
            print("\t{}: {}".format(idx, name))

       print("get table: ")
        resolved_table_schema = ctx.get_table(table_name)
        print("resolved table schema: {}".format(resolved_table_schema))
        print("JVM resolved table schema: {}".format(resolved_table_schema.jresolved_table_schema))

       print("inserting some data...")
        for row_batch in generate_reviews(num_reviews=20, batch_size=8):
            ctx.batch_insert(resolved_table_schema, row_batch)
Scala
import com.ibm.event.example.DataGenerator
import com.ibm.event.oltp.InsertResult
import com.ibm.event.oltp.EventContext

// Open the database
val eventContext = EventContext.getEventContext("EVENTDB")

// Insert generated rows asynchronously in batch
val numRowsPerBatch = 1000
val numBatches = 1000
var failCount = 0
val startTime = System.currentTimeMillis()
for {i &lt;- 1 to numBatches} {
val batch = DataGenerator.generateRows(reviewSchema.schema, numRowsPerBatch).toIndexedSeq
val future: Future[InsertResult] = eventContext.batchInsertAsync(reviewTable, batch)
val result: InsertResult = Await.result(future, Duration.Inf)
if (result.failed) {
println(s"batch insert incomplete: $result")
failCount += numRowsPerBatch
} else if (i % 100 == 0) {
System.out.println(s"First $i batches successfully inserted")
}
}
val numRowsInserted = numBatches*numRowsPerBatch
println(s"Ingested $numRowsInserted rows")
val timeInserting = (System.currentTimeMillis()-startTime)/1000.0
println(s"Ingest took $timeInserting seconds - ${(numRowsInserted 
- failCount)/timeInserting} inserts per second. $failCount inserts failed")

The rows are supplied as an IndexSeq[Row] collection where a row is a Spark SQL Row instance that matches the StructType class (as defined in Spark SQL) of the resolved table schema.

The client application can either submit new inserts immediately or wait for the current insert operation to complete by waiting for output from the Future object.

Clean up the EventContext singleton class

Close any existing connections to the database in the client connection pool:

Python
EventContext.clean_up()
Scala
EventContext.cleanUp()

Querying the data by using Spark SQL

Your application must complete the following high-level tasks to query data that is stored in an IBM Db2 Event Store cluster.

Note: This section assumes that you are already connected to the IBM Db2 Event Store cluster.

For more information, see the IBM Db2 Event Store ConfigurationReader API guide.

Create a IBM Db2 Event Store Spark session

To run Spark SQL queries, you must set up a Db2 Event Store Spark session (EventSession):

Python
from eventstore.sql import EventSession
sparkSession = SparkSession.builder.appName("EventStore in Python").getOrCreate()
eventSession = EventSession(sparkSession.sparkContext, "EVENTDB")
Scala
import org.apache.spark.sql.ibm.event.EventSession

val sparkSession = SparkSession.builder().getOrCreate()

val esSession = new EventSession(sparkSession.sparkContext, "EVENTDB")

Open the database

You must open the database before you can use it. To open an existing database during an Db2 Event Store session, use:

Python
eventSession.open_database()
Scala
esSession.openDatabase()

This operation creates a connection to the IBM Db2 Event Store system and opens the database that you specified when you created the Db2 Event Store Spark session.

Load a table and get the DataFrame reference

The loadEventTable method provides the DataFrame reference for the specified table in IBM Db2 Event Store.

You can use the DataFrame reference to define a temporary view by using the table.createOrReplaceTempView method. You can then use the view name in Spark SQL statements to manipulate, use, and retrieve rows from a query.

Python
table = eventSession.load_event_table("ReviewTable")
table.createOrReplaceTempView("ReviewTable")
Scala
val table = esSession.loadEventTable("ReviewTable")
table.createOrReplaceTempView("ReviewTable")

Specify what data queries are run against

By default, IBM Db2 Event Store returns the most recent consistent snapshot in the shared storage without waiting for all of the data in the log to be persisted to the shared storage. However, you can adjust the data that the query is run against by specifying a value for the setQueryReadOption method:

Python
from eventstore.sql import EventSession
eventSession = EventSession(sparkSession.sparkContext, "EVENTDB")
eventSession.set_query_read_option("SnapshotNow")
Scala
import org.apache.spark.sql.ibm.event.EventSession
 
val esSession = new EventSession(sparkSession.sparkContext, "EventDB")
esSession.setQueryReadOption("SnapshotNow")

Run an SQL query

After you open the database and load the relevant table, you can run a Spark SQL query on the EventSession object:

Python
query = "SELECT * FROM ReviewTable"
result = eventSession.sql(query)
result.show(50)
Scala
val result = esSession.sql("select count(*) from ReviewTable")
result.show(50)

Code samples

Java developer guide and code sample

The IBM Db2 Event Store Scala API can be invoked from Java.

For interoperability with the Scala API, the JavaConversions object from the Scala library can be used to create the Scala collection types from the corresponding Java collection types.

The Spark SQL schema (StructType, StructField, and so on) can be built with the Java API of Spark SQL from the org.apache.spark.sql.DataTypes package. For example:

  • DataTypes.createStructType(..)
  • DataTypes.createStructField(..)

You must complete the steps to Prepare your development environment before you develop your application.

The following steps describe how to invoke the sample Java application. The steps include a sample Java program that can connect to IBM Db2 Event Store.

This guide assumes that you are using Mac OS, so you might need to make adjustments if you're working on a different operating system.

To develop a Java application:

  1. Download and install JDK 1.8 from http://www.oracle.com/technetwork/java/javase/downloads/index.html.
  2. Locate the client file that you downloaded when you completed Prepare your development environment.
    • Enterprise Edition: Locate the ibm-db2-eventstore-client-n.n.n.jar file
  3. Download and install Scala 2.11.8.3 from https://www.scala-lang.org/download/all.html.
    For example, run the following commands to create a directory named Scala and extract the contents to that directory:
    mkdir ~/Scala
    cd ~/Scala
    tar -xzvf ~/Downloads/scala-2.11.8.tgz
  4. Set SPARK_HOME environment variable to the Spark 2.1 installation path.
    export SPARK_HOME=SparkInstallationPath
  5. Add the Scala binaries to your PATH environment variable:
    export PATH=$PATH:~/Scala/scala-2.11.8/bin
  6. Create the ESLIB environment variable and set it to the location of the client file:
    • Enterprise Edition: Set the ESLIB environment variable to point to the location of the ibm-db2-eventstore-client-n.n.n.jar file, for example:
      export ESLIB=~/Downloads/ibm-db2-eventstore-client-n.n.n.jar
  7. Copy the client file into your working directory:
    • Enterprise Edition: Copy the ibm-db2-eventstore-client-n.n.n.jar file into your working directory.
  8. Copy and paste the following code into the JavaEventStoreExample.java file:
    import com.ibm.event.catalog.IndexSpecification;
      import com.ibm.event.catalog.SortSpecification;
      import com.ibm.event.catalog.TableSchema;
      import com.ibm.event.oltp.InsertResult;
      import com.ibm.event.oltp.EventContext;
      import com.ibm.event.common.ConfigurationReader;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.ibm.event.EventSession;
      import org.apache.spark.sql.catalyst.expressions.GenericRow;
      import org.apache.spark.sql.types.DataType;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      import scala.Option;
      import scala.collection.JavaConversions;
      import scala.collection.Seq;
      import scala.concurrent.Await;
      import scala.concurrent.Future;
      import scala.concurrent.duration.Duration;
    
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.Random;
    
      /**
       * A simple Java class that invokes the EventContext.
       * It performs the following steps
       * (1) creates a database
       * (2) creates a table
       * (3) asynchronously inserts a batch of rows into the table
       * (4) Open a EventSession and run a query against the table (SELECT * FROM Ads)*
       * (5) drops the database
       *
       * Note: to execute one case use:
       *     java -cp target/scala-2.11/ibm-event_2.11-assembly-1.0.jar:${
       *     SPARK_HOME}/jars/* com.ibm.event.example.JavaEventStoreExample
       */
      public class JavaEventStoreExample {
    
          /** Name of the database that is created at the beginning and dropped at the
           * end of this program. A database with this name must not already exist. */
          private static final String DATABASE_NAME  = "PlaygroundDB";
    
          /** Number of rows to insert into the table */
          private static final int NUM_ROWS = 1000;
    
          /** SparkSQL Query to run on table */
          private static final String QUERY = "SELECT * FROM Ads";
    
          /** Number of rows to show from query result */
          private static final int NUM_QUERY_ROWS = 10;
    
          /**
           * Generate a number of random rows
           * @param schema table schema for the rows
           * @param numRows number of rows to generate
           * @return a list of rows.
           */
          private static List<Row> getRandomRows(TableSchema schema, int numRows) {
              ArrayList<Row> rows = new ArrayList<>(numRows);
              Random rnd = new Random(1234);
              for (long rowId=0; rowId<numRows; ++rowId) {
    
                  StructField[] fields = schema.schema().fields();
                  Object[] values = new Object[fields.length];
                  int fieldIdx = 0;
                  for (StructField field : fields) {
                      DataType dt = field.dataType();
                      if (field.name().equals("adId") && dt.equals(DataTypes.LongType)) {
                          values[fieldIdx++] = rowId;
                      } else if (dt.equals(DataTypes.IntegerType)) {
                          values[fieldIdx++] = rnd.nextInt();
                      } else if (dt.equals(DataTypes.LongType)) {
                          values[fieldIdx++] = rnd.nextLong();
                      } else if (dt.equals(DataTypes.ByteType)) {
                          values[fieldIdx++] = (byte)rnd.nextInt(256);
                      } else if (dt.equals(DataTypes.FloatType)) {
                          values[fieldIdx++] = rnd.nextFloat();
                      } else if (dt.equals(DataTypes.DoubleType)) {
                          values[fieldIdx++] = rnd.nextDouble();
                      } else if (dt.equals(DataTypes.BooleanType)) {
                          values[fieldIdx++] = rnd.nextBoolean();
                      } else {
                          throw new RuntimeException("unsupported data type: "+dt);
                      }
                  }
                  rows.add(new GenericRow(values));
              }
              return rows;
          }
    
          public static void main(String args[]) {
    
              // Set connection endpoints
              // ConfigurationReader.setConnectionEndpoints("<HOST:IP>");
              // Define schema of 'Ads' table as SparkSQL StructType
              StructField[] adsFields = new StructField[] {
                  DataTypes.createStructField("storeId", DataTypes.LongType, false),
                  DataTypes.createStructField("adId", DataTypes.LongType, false),
                  DataTypes.createStructField("categoryId", DataTypes.IntegerType, false),
                  DataTypes.createStructField("productName", DataTypes.LongType, false),
                  DataTypes.createStructField("budget", DataTypes.LongType, false),
                  DataTypes.createStructField("cost", DataTypes.LongType, false)
              };
              StructType adsSchema = DataTypes.createStructType(adsFields);
    
              // Create IBM Db2 Event Store table schema
              // For interoperability with the Scala API, the scala collection types are created from
              // the corresponding Java collection types.
              Seq<String> pkList = JavaConversions.asScalaBuffer(Collections.singletonList("adId")).toSeq();
              Seq<String> emptyStringList = JavaConversions.asScalaBuffer(Collections.<String>emptyList()).toSeq();
              Seq<SortSpecification> emptySortSpecList = JavaConversions.asScalaBuffer(
                      Collections.<SortSpecification>emptyList()).toSeq();
              TableSchema adsTableSchema = new TableSchema(
                      "Ads",    // table name
                      adsSchema,          // schema SparkSQL TypeStruct
                      pkList,             // sharding key: list of columns that form composite sharding key
                      pkList);            // primary key: list of columns that form composite primary key
    
              // Create index specification along with table
              IndexSpecification index = new IndexSpecification(
                      "FooIndex",   // index name
                      adsTableSchema,         // table schema
                      pkList,                 // list of equality columns
                      emptySortSpecList,      // list of sort columns
                      emptyStringList,        // list of include columns
                      Option.apply(null));    // IndexID: None (will be engine generated)
    
              // Open database
              System.out.println("opening database "+DATABASE_NAME);
              EventContext ctx = EventContext.getEventContext(DATABASE_NAME);
    
              // Create 'Ads' table with index
              System.out.println("creating table "+adsTableSchema.tableName());
              ctx.createTableWithIndex(adsTableSchema, index);
    
              // Generate one batch of row and insert it asynchronously into the Ads table.
              System.out.println("asynchronously inserting rows "+NUM_ROWS+" as batch");
              Future<InsertResult> future = ctx.batchInsertAsync(ctx.getTable("Ads"),
                      JavaConversions.asScalaBuffer(getRandomRows(adsTableSchema, NUM_ROWS)).toIndexedSeq());
              try {
                  // Wait for insert to complete and check outcome
                  InsertResult result = Await.result(future, Duration.Inf());
                  if (result.successful()) {
                      System.out.println("batch insert successful");
                  } else {
                      System.out.println("batch insert failed: "+result);
                      // if required, retry using `result.retryFailed()`
                  }
              } catch(Exception e) {
                  System.out.println("Await threw exception: "+e);
                  e.printStackTrace();
              }
    
              // Create new IBM Db2 Event Store Spark Session
              System.out.println("create EventSession");
              SparkSession sparkSession = SparkSession.builder().master("local[3]")
                      .appName("EventStoreExample").getOrCreate();
              EventSession session = new EventSession(sparkSession.sparkContext(),
                      DATABASE_NAME);
    
              // Open the database and register the table in the Spark catalog
              System.out.println("open database and table Ads with SparkSQL");
              session.openDatabase();
              session.loadEventTable("Ads").createOrReplaceTempView("Ads");
    
              // Run query and show NUM_QUERY_RESULT rows of the query result
              System.out.println("execute query: "+QUERY);
              Dataset<Row> results = session.sql(QUERY);
              System.out.println("result: ("+NUM_QUERY_ROWS+" result rows):");
              results.show(10);
    
              EventContext.cleanUp();
          }
      }
  9. Run the following command to generate and compile the JavaEventStoreExample.class file:
    javac -classpath scala-library-2.11.8.jar:${ESLIB}:${SPARK_HOME}/jars/* JavaEventStoreExample.java
  10. Run the following command to run the application:
    java -cp .:{ESLIB}:${SPARK_HOME}/jars/* JavaEventStoreExample

Python developer guide and code sample

You must complete the steps to Prepare your development environment before you develop your application.

The following steps describe how to write your first application using Python3. The steps include a sample Python program that can connect to IBM Db2 Event Store.

This guide assumes that you are using Mac OS, so you might need to make adjustments if you're working on a different operating system.

To develop a Python application:

  1. Download and install Python 3.5 on your workstation from https://www.python.org/ftp/python/3.5.4/python-3.5.4-macosx10.6.pkg.
  2. Locate the client file that you downloaded when you completed Prepare your development environment.
    • Enterprise Edition: Locate the ibm-db2-eventstore-client-n.n.n.jar file
  3. Locate the python.zip file that you downloaded when you completed Prepare your development environment.
    • Enterprise Edition: You downloaded the ZIP file from GitHub.
  4. Unzip the python.zip into your Python installation directory.
    • Enterprise Edition: For example:
      unzip ~/Downloads/python.zip 
      -d /Library/Frameworks/Python.framework/Version/3.5/lib/python3.5/
  5. Install any additional required modules. For this application, you need the PySpark module:
    pip3 install pyspark
  6. Create a file named remote_client_app.py.
  7. Copy and paste the following code into the remote_client_app.py file:
    from eventstore.oltp import EventContext
    from eventstore.sql import EventSession
    from eventstore.common import ConfigurationReader
    from pyspark.sql import SparkSession
    from eventstore.catalog import TableSchema
    from pyspark.sql.types import *
    from time import time
    
    ip = input("Please specify host IP: ")
    print("Connecting to {}".format(ip))
    ConfigurationReader.setConnectionEndpoints("{}:18730;{}:1101".format(ip,ip))
    
    db_name = input("Please specify name of the new DB: ")
    print("Opening database {}".format(db_name))
    EventContext.get_event_context(db_name)
    
    sparkSession = SparkSession.builder.appName("Remote Client Connection in Python").getOrCreate()
    eventSession = EventSession(sparkSession.sparkContext, db_name)
    eventSession.open_database()
    
    tab_name = "tab1"
    with EventContext.get_event_context(db_name) as ctx:    
     schema = StructType([
      StructField("id", IntegerType(), nullable = False),
      StructField("name", StringType(), nullable = False)
     ])  
     table_schema = TableSchema(tab_name, schema, sharding_columns=["id"], pk_columns=["id"])
    
    print("Creating table {}".format(tab_name))
    ctx.create_table(table_schema)
    print("Table {} is created successfully".format(tab_name))
    table_names = ctx.get_names_of_tables()
    for idx, name in enumerate(table_names):
     print(name)
    
    
    with EventContext.get_event_context(db_name) as ctx:
     table = ctx.get_table(tab_name)
     row_batch = []
     row_batch.append(dict(id=7, name="eleven"))
     print("Inserting single row: {}".format(row_batch))
     ctx.batch_insert(table, row_batch)
    
    tab = eventSession.load_event_table(tab_name)
    tab.createOrReplaceTempView(tab_name)
    results = eventSession.sql("SELECT * FROM {}".format(tab_name))
    results.show()
    
  8. Run the application from the directory where the remote_client_app.py file is located:
    python3 remote_client_app.py

Scala developer guide and code sample

You must complete the steps to Prepare your development environment before you develop your application.

The following steps describe how to write your first application using the IntelliJ IDEA. The steps include a sample Scala program that can connect to IBM Db2 Event Store.

This guide assumes that you are using Mac OS, so you might need to make adjustments if you're working on a different operating system.

To develop a Scala application:

  1. Open IntelliJ IDEA.

    If you don't have it installed, you can download a community version from https://www.jetbrains.com/idea/download/#section=mac.

  2. From the toolbar, click File > New Project.
  3. Select Scala > SBT.
  4. Name the project EventStoreApps and set the location to ~/IdeaProjects/EventStoreApps.
  5. Ensure that the JDK is at Version 1.8.
  6. Choose Scala version 2.11.8 and click Finish.
  7. In the EventStoreApps project directory, right-click at the root of your project and select New > Directory. Name the directory lib.
  8. In the EventStoreApps project directory, navigate to scr > main > scala and add a new Scala class. Select object as the kind and name the class RemoteClientApp.
  9. Copy and paste the following code into your RemoteClientApp.scala file:
    import com.ibm.event.oltp.EventContext
    import com.ibm.event.common.ConfigurationReader
    import org.apache.spark.sql.ibm.event.EventSession
    import com.ibm.event.catalog.TableSchema
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    
    object RemoteClientApp {
      def main(args: Array[String]): Unit = {
    
        println("Please specify host IP: ")
        val ip = scala.io.StdIn.readLine()
        println(s"Connecting to $ip;")
        ConfigurationReader.setConnectionEndpoints(s"$ip:18730;$ip:1101") 
    
        val ctx = EventContext.getEventContext(dbName)
    
        val tabName = "tab1"
        println("Creating table " + tabName)
        val tabSchema = TableSchema(tabName, StructType(Array(
          StructField("id", IntegerType, false),
          StructField("name", StringType, false))
        ), shardingColumns = Array("id"), pkColumns = Array("id"))
    
        var res = ctx.createTable(tabSchema)
        assert(res.isEmpty, s"create table: ${res.getOrElse("success")}")
    
        //insert into the table
        val tab = ctx.getTable(tabName)
        var row = Row(7,"eleven")
        println( s"Inserting single row: " + row)
    
        var res1 = ctx.insert(tab, row)
        if (res1.failed)
          println(s"single row insert failed: $res1")
        else
          println(s"Row successfully inserted into $tabName")
    
        // Now select inserted row
        val sc = new SparkContext(new SparkConf().setAppName("Remote Client Connection").setMaster(Option(System.getenv("MASTER")).getOrElse("local[3]")))
        val sqlContext = new EventSession(sc, dbName)
    
        try {
          val sqlContext = new EventSession(sc, dbName)
          sqlContext.openDatabase()
          sqlContext.loadEventTable(tabName).createOrReplaceTempView(tabName)
          val results = sqlContext.sql(s"SELECT * FROM $tabName")
          results.show()
        } catch {
          case e: Exception =>
            println("EXCEPTION: attempting to exit..." + e.getMessage)
            e.printStackTrace()
        }
    
        EventContext.cleanUp()
        sys.exit()
      }
    }
  10. In the root of the project select build.sbt and replace the content of the file with the following code:
    • Enterprise Edition
      name := "RemoteClientApp"
      
      version := "1.0"
      
      val sparkver =    "2.0.2"
      val scalaver =  "2.11.8"
      scalaVersion :=  scalaver
      
      libraryDependencies += "com.ibm.event" % "ibm-db2-eventstore-client" % "1.1.3"
      
      libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value % "provided"
      
      libraryDependencies += "org.apache.spark" %% "spark-core" % sparkver % "provided"
      
      libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkver  % "provided"
      
      run in Compile <<=
        Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
  11. Save the build.sbt file.
  12. When prompted, select Enable auto-import and wait for your project to compile.
    Note: This step might take a while the first time your project compiles because IntelliJ IDEA downloads the required dependencies.
  13. After the project compiles, select Run > Edit Configuration from the toolbar.
  14. Click + and select SBT Task.
  15. Name the task.
  16. Under Tasks, type run and click OK.
  17. From the toolbar, select Run > Run to run your application.

ODBC developer guide and code sample

The following steps describe how to run an ODBC application and DB2CLI for IBM Db2 Event Store.

You must complete the steps to Prepare your development environment before you develop your application. However, there are steps specific for an ODBC/DB2CLI application that you must prepare before you can develop the application.

ODBC/DB2CLI setup

  1. Download the Db2 11.5 version of the IBM Data Server Client package according to your host platform. You can download the package from here.
  2. Unpack the package in any location using the following command:
    tar -xvf ibm_data_server_driver_package_linuxx64_v11.5.tar.gz -C <ds_driver_path>
  3. Unpack the ODBC client, under the server package, to any location using the following command:
    tar -xvf <ds_driver_path>/dsdriver/odbc_cli_driver/linuxamd64(or your own platform)/ibm_data_server_driver_for_odbc_cli.tar.gz -C <odbc_client_path>
  4. Copy the gssplugin libraries from the IBM Data Server Driver package to the directory where you unpacked your ODBC client using the following command:
    cp -r <ds_driver_path>/<security32|security64> <odbc_client_path>/
    Note:
    • The gssplugin libraries will be used by the ODBC client to connect to Db2 Event Store.
    • The plugin will not be picked up by the ODBC client if you move it to other directories under the ODBC client directory.
  5. Take note of the odbc_client_path you used for the ODBC sample.
Running the ODBC sample application
Note: Make sure to configure your SSL through the REST API. For more information, refer to Get client keystore and public SSL certificate using REST API.
  1. Compile the application by running the following command:
    ./bldExampleODBCApp.c <odbc_client_path>
    Where <odbc_client_path> is where you have unpacked the ODBC client during the setup. The following script will then:
    • Find the ODBC headers under <odbc_client_path>/include and compile the main application ExampleODBCApp.c and a helper utility called utilcli.c which will check for errors and return diagnostic messages.
    • Link the Db2 library under <odbc_client_path>/lib.
  2. Run the executable: ./ExampleODBCApp.

Cleaning up

To clean up the compiler generated file, run the following command:
./bldExampleODBCApp.c --clean

Running the Db2 interactive CLI

  1. Go to the ODBC client bin directory by running the following command:
    cd <odbc_client_path>/bin
  2. Run the following command next:
    /db2cli execsql -connstring "DATABASE=eventdb; Protocol=tcpip; Authentication=GSSPLUGIN; Security=ssl; 
    SSLServerCertificate=<server_certificate_path>; HOSTNAME=<IP>; PORT=18730; UID=<username>; PWD=<password>"
    • Where <server_certificate_path> is the path that you obtained while setting up SSL using the REST API.
    • Where <IP> is the IP address of your Db2 Event Store cluster, or the environment variable $IP.
    • Where <username> and <password> are your Db2 Event Store user credentials, or the environment variables $EVENT_USER and EVENT_PASSWORD.
  3. The previous command will now open an interactive user interface you can run queries on.