The interface for working with databases and tables and for inserting data into IBM
Db2 Event Store is exposed as the EventContext
instance.
Your application must complete the following high-level tasks to insert data into IBM
Db2 Event Store:
Connect to the IBM
Db2 Event Store Cluster
To establish a connection to the IBM
Db2 Event Store
cluster, the client must provide connection endpoint information
(setConnectionEndpoints) by using a configuration reader (the
ConfigurationReader class) which provides a set of methods that you can use to
connect to and configure IBM
Db2 Event Store. For more
information on the connection endpoint, refer to Identifying the connection endpoint of the IBM Db2 Event Store instance.
- Python
-
from eventstore.common import ConfigurationReader
ConfigurationReader.setConnectionEndpoints("HostIP:Port")
- Scala
-
import com.ibm.event.common.ConfigurationReader
ConfigurationReader.setConnectionEndpoints("HostIP:Port")
For more information, see the IBM Db2 Event Store ConfigurationReader API guide.
Open an existing database
If the database already exists, you must open the database before you can use it. You can open
the database in either of the following ways:
- If you don't have the EventContext instance for the database, you must obtain
it by calling the getEventContext method of the EventContext
singleton class:
- Python
-
with EventContext.get_event_context("EVENTDB") as ctx:
- Scala
-
val eventContext = EventContext.getEventContext("EVENTDB")
- If you already have the EventContext instance for the database, open the
database by calling the openDatabase method:
- Python
-
eventContext.open_database()
- Scala
-
val result = eventContext.openDatabase()
Specify the table schema
An IBM
Db2 Event Store table is created from a schema.
When you specify the table schema (TableSchema) you specify the columns, the
sharding key, and the primary key using Spark data types.
- Python
-
from eventstore.catalog import TableSchema
schema = StructType([
StructField("userId", LongType, nullable = false),
StructField("categoryId", IntegerType, nullable = false),
StructField("productName", StringType, nullable = false),
StructField("boolfield", BooleanType, nullable = false),
StructField("boolfield2", BooleanType, nullable = true),
StructField("duration", IntegerType, nullable = false ),
StructField("review", StringType, nullable = false)
])
table_name = "ReviewTable"
table_schema = TableSchema(table_name, schema,
sharding_columns=["userID"],
pk_columns=["userID", "categoryId"])
- Scala
-
import com.ibm.even.catalog.TableSchema
val reviewSchema = TableSchema("ReviewTable", StructType(Array(
StructField("userId", LongType, nullable = false),
StructField("categoryId", IntegerType, nullable = false),
StructField("productName", StringType, nullable = false),
StructField("boolfield", BooleanType, nullable = false),
StructField("boolfield2", BooleanType, nullable = true),
StructField("duration", IntegerType, nullable = false ),
StructField("review", StringType, nullable = false))),
shardingColumns = Seq("userId"), pkColumns = Seq("userId"))
In this example, the sharding key and the primary key are defined on the same column.
Databases in IBM
Db2 Event Store are partitioned into
shards. Any given IBM
Db2 Event Store node (in a multi-node
IBM
Db2 Event Store cluster) contains 0, 1, or N shards of
the defined database. In addition to the mandatory shard key, you can optionally provide a primary
key. When you define a primary key, IBM
Db2 Event Store
ensures that only a single version of each primary key exists in the database.
Create a table
Create an IBM
Db2 Event Store table using the unresolved
schema that you specified in the previous step. If you did not specify a schema name, the schema
name will default to your IBM
Db2 username.
- Python
-
ctx.create_table(table_schema)
- Scala
-
EventContext.createTable(reviewSchema)
Get the resolved table schema reference
You must have a reference to the resolved table schema OR use the optional
schemaName
argument when invoking getTable()
before you can
perform insert operations. A resolved schema contains additional table metadata that is maintained
and used by the IBM
Db2 Event Store engine. For more
information on the getTable()
method, see Obtain a resolved table schema.
- Python
-
resolved_table_schema = ctx.get_table("ReviewTable")
- Scala
-
val reviewTable = EventContext.getTable("ReviewTable")
Insert data asynchronously in batch
After you have the resolved table schema, you can insert data by inserting a single row or by
performing a batch insert. Batch inserts are always performed asynchronously. A single row insertion
can be synchronous or asynchronous.
In the following example, random data is generated by a data generator based on the provided
schema and the number of rows to insert in each batch. The data is then sent to the IBM
Db2 Event Store engine in batch asynchronously.
- Python
-
from eventstore.oltp import EventContext
from eventstore.oltp.row_generator import generate_reviews
from eventstore.catalog import TableSchema, IndexSpecification, SortSpecification, ColumnOrder
from pyspark.sql.types import *
if __name__ == '__main__':
print("opening database...")
with EventContext.get_event_context("eventdb") as ctx:
schema = StructType([
StructField("userId", LongType(), nullable=True),
StructField("time", TimestampType(), nullable=True),
StructField("productId", IntegerType(), nullable=True),
StructField("rating", IntegerType(), nullable=True),
StructField("review", StringType(), nullable=True)
])
table_name = "reviews"
table_schema = TableSchema(table_name, schema,
sharding_columns=["userId"],
pk_columns=["userId", "time"])
index_spec = IndexSpecification(index_name="pkindex",
table_schema=table_schema,
equal_columns=["userId"],
sort_columns=[SortSpecification("time", ColumnOrder.ASCENDING_NULLS_LAST)],
include_columns=["rating"])
print("creating table with index...\n{}".format(table_schema))
ctx.create_table_with_index(table_schema, index_spec)
print("list of table names:")
table_names = ctx.get_names_of_tables()
for idx, name in enumerate(table_names):
print("\t{}: {}".format(idx, name))
print("get table: ")
resolved_table_schema = ctx.get_table(table_name)
print("resolved table schema: {}".format(resolved_table_schema))
print("JVM resolved table schema: {}".format(resolved_table_schema.jresolved_table_schema))
print("inserting some data...")
for row_batch in generate_reviews(num_reviews=20, batch_size=8):
ctx.batch_insert(resolved_table_schema, row_batch)
- Scala
-
import com.ibm.event.example.DataGenerator
import com.ibm.event.oltp.InsertResult
import com.ibm.event.oltp.EventContext
// Open the database
val eventContext = EventContext.getEventContext("EVENTDB")
// Insert generated rows asynchronously in batch
val numRowsPerBatch = 1000
val numBatches = 1000
var failCount = 0
val startTime = System.currentTimeMillis()
for {i <- 1 to numBatches} {
val batch = DataGenerator.generateRows(reviewSchema.schema, numRowsPerBatch).toIndexedSeq
val future: Future[InsertResult] = eventContext.batchInsertAsync(reviewTable, batch)
val result: InsertResult = Await.result(future, Duration.Inf)
if (result.failed) {
println(s"batch insert incomplete: $result")
failCount += numRowsPerBatch
} else if (i % 100 == 0) {
System.out.println(s"First $i batches successfully inserted")
}
}
val numRowsInserted = numBatches*numRowsPerBatch
println(s"Ingested $numRowsInserted rows")
val timeInserting = (System.currentTimeMillis()-startTime)/1000.0
println(s"Ingest took $timeInserting seconds - ${(numRowsInserted
- failCount)/timeInserting} inserts per second. $failCount inserts failed")
The rows are supplied as an IndexSeq[Row] collection where a row is a Spark
SQL Row instance that matches the StructType class (as defined
in Spark SQL) of the resolved table schema.
The client application can either submit new inserts immediately or wait for the current insert
operation to complete by waiting for output from the Future object.
Clean up the EventContext singleton class
Close any existing connections to the database in the client connection pool:
- Python
-
EventContext.clean_up()
- Scala
-
EventContext.cleanUp()