Open an existing database
If the database already exists, you must open the database before you can use it. You can open
the database by calling the
getEventContext method of the
EventContext singleton class. Database names are always case insensitive and are
limited to 8 characters. If you enter a database name with more than 8 characters only the first 8
characters are used as the database
name.
EventContext.getEventContext(dbName: String): EventContext
Restriction: This command cannot complete if any of the nodes on which the
database is defined are inactive. If you notice that this command failed because one or more nodes
are inactive, wait for the nodes to be automatically restarted before you run this command again.
- Arguments
-
- dbName
- The name of the database.
- Returns
- The EventContext object for the given database.
- Example
-
- Python
-
with EventContext.get_event_context("EVENTDB") as ctx:
- Scala
-
val eventContext = EventContext.getEventContext("EVENTDB")
Define a schema
A table is created from a TableSchema object. The unresolved table schema is a
wrapper around a StructType element from Spark SQL that can also specify shard
keys.
To define a schema, you must import the com.ibm.event.catalog.TableSchema
package.
object TableSchema(
val tableName: String,
val schema: StructType,
val shardingColumns: Seq[String],
val pkColumns: Seq[String],
val schemaName: Option[String])
Note: tableName, schemaName, column name and index name identifiers are case insensitive if they are
not delimited with single quotes. When an identifier is delimited by single quotes the identifier is
considered as case sensitive. For example, if the column name used is
'ColName'
then the string is considered case sensitive and its case is not changed.
This applies wherever
these identifiers can be used in the APIs include in the Spark SQL query in the OLAP
section.
- Arguments
-
- tableName
- The name of the table to which the schema belongs.
- schema
- A structure which contains column names and types. The schema is expressed as the Spark SQL
StrutcType class.
- shardingColumns
- The entries in the schema structure on which to shard the table.
- pkColumns
- The entries in the schema structure that act as the primary key.
- schemaName (optional)
- The table's schema qualifier name. If not defined it is set to what is defined in
ConfigurationReader.setEventSchema, and if that was not set, the default is the
userid as set in ConfigurationReader.setEventUser..
- Example
-
- Python
-
schema = StructType([
StructField("userId", LongType, nullable = false),
StructField("categoryId", IntegerType, nullable = false),
StructField("productName", StringType, nullable = false),
StructField("boolfield", BooleanType, nullable = false),
StructField("boolfield2", BooleanType, nullable = true),
StructField("duration", IntegerType, nullable = false ),
StructField("review", StringType, nullable = false)
])
table_name = "ReviewTable"
table_schema = TableSchema(table_name, schema,
sharding_columns=["userID"],
pk_columns=["userID", "categoryId"],
schema_name="schema")
- Scala
-
import com.ibm.event.catalog.TableSchema
val testSchema = TableSchema("TestTable",
StructType(Array(
StructField("id", LongType, nullable = false),
StructField("someNumber", IntegerType, nullable = false),
StructField("someString", StringType, nullable = false),
StructField("someBool", BooleanType, nullable = false),
StructField("someNullableBool", BooleanType, nullable = true))),
shardingColumns = Seq("id"), // Shard by id
pkColumns = Seq("id"), // id is also the primary key
schemaName=Option("schema")
)
Create a table
After a schema is defined, you can create a table by using the createTable
method in the EventContext object.
def createTable(tableSchema: TableSchema, tableProperties: TableProperty): Option[EventError]
Restriction: This command cannot complete if any of the nodes on which the
database is defined are inactive. If you notice that this command failed because one or more nodes
are inactive, wait for the nodes to be automatically restarted before you run this command again.
- Arguments
-
- tableSchema
- The unresolved table schema that contains table name, column types, and sharding keys.
- tableProperty
- The properties that you want to specify for the table.
Note: In Scala applications, table
properties are specified as a HashMap of key-value pairs
- ibm.eventstore.ttl.X
- Optional: You can specify how long data persists in the table before it automatically expires.
Data expires when it reaches the end of the time to live (TTL) period. The TTL period can be
specified by replacing 'X' with days, hours, minutes, or seconds.
If you do not specify this
property or if you set the TTL period to 0, the data remains in the table indefinitely.
- Returns
- Some error messages from the EventError class if the
request fails. Otherwise,
None is returned.
- Example
-
- Python
-
ctx.create_table(table_schema)
- To create a table with the
ibm.eventstore.ttl.X
property:
table_prop = {"ibm.eventstore.ttl.hours": "2"}
ctx.create_table(table_schema, table_prop)
- Scala
-
val result = eventContext.createTable(testSchema)
- To create a table with the
ibm.eventstore.ttl.X
property:
val tableProperties = new HashMap[String, String](Map("ibm.eventstore.ttl.hours"-> "2"))
val result = eventContext.createTable(tableSchema, tableProperties)
Create a table using the Db2 command line
You can create a table using the Db2 command line syntax as a string in
Db2 Event Store.
def createTableCommand(stmtString: String): Option[EventError]
- Arguments
-
- String
- Create table statement.
- Returns
- Some error messages from the EventError class if the request fails.
Otherwise,
None is returned.
- Example
-
- Python
-
create_table_command("CREATE TABLE test2 ( a int) ")
-
- Scala
-
ctx.createTableCommand("CREATE TABLE test2 ( a int) ")
Create a table with an index
Alternatively, you can create a table with an index to increase the speed at which data is
retrieved. You must use the schema and the primary index that are specified for the table.
To create a table with an index, you must import the following packages:
- com.ibm.event.catalog.ColumnOrder
- com.ibm.event.catalog.IndexSpecification
- com.ibm.event.catalog.SortSpecification
- com.ibm.event.catalog.TableSchema
def createTableWithIndex(tableSchema: TableSchema, indexSpecification:
IndexSpecification, tableProperties: TableProperties): Option[EventError]
Restriction: This command cannot complete if any of the nodes on which the
database is defined are inactive. If you notice that this command failed because one or more nodes
are inactive, wait for the nodes to be automatically restarted before you run this command again.
- Arguments
-
- tableSchema
- The unresolved table schema that contains table name, column types, and sharding keys.
- indexSpecification
- The first index that is associated with the table; the primary index key.
- tableProperty
- The properties that you want to specify for the table.
Note: In Scala applications, table
properties are specified as a HashMap of key-value pairs
- ibm.eventstore.ttl.X
- Optional: You can specify how long data persists in the table before it automatically expires.
Data expires when it reaches the end of the time to live (TTL) period. The TTL period can be
specified by replacing 'X' with days, hours, minutes, or seconds.
If you do not specify this
property or if you set the TTL period to 0, the data remains in the table indefinitely.
- Returns
- Some error messages from the EventError class if the
request fails. Otherwise,
None is returned.
- Example
-
- To create a table with an index specification that includes only an equality column:
- Python
-
schema = StructType([
StructField("id", LongType, nullable = False),
StructField("someNumber1", IntegerType, nullable = False),
StructField("someNumber2", IntegerType, nullable = False),
StructField("someString", StringType, nullable = False),
StructField("someBool", BooleanType, nullable = False),
StructField("someNullableBool", BooleanType, nullable = True)
])
table_schema = TableSchema("TestTable3", schema,
sharding_columns=["id"],
pk_columns=["id", "someNumber1"])
index_spec = IndexSpecification(index_name="TestTableIndex",
table_schema=table_schema,
equal_columns=["id", "someNumber1"])
table_prop = {"ibm.eventstore.ttl.hours": "2"}
ctx.create_table_with_index(table_schema, index_spec, table_prop)
- Scala
-
val testIndex = IndexSpecification("TestTableIndex", testSchema, equalColumns = Seq("id"))
val tableProperties = new HashMap[String, String](Map("ibm.eventstore.ttl.hours"-> "2"))
val result = oltpContext.createTableWithIndex(testSchema, testIndex, tableProperties)
- To create a table with an index specification that includes equality and sort columns:
- Python
-
schema = StructType([
StructField("id", LongType, nullable = False),
StructField("someNumber1", IntegerType, nullable = False),
StructField("someNumber2", IntegerType, nullable = False),
StructField("someString", StringType, nullable = False),
StructField("someBool", BooleanType, nullable = False),
StructField("someNullableBool", BooleanType, nullable = True)
])
table_schema = TableSchema("TestTable3", schema,
sharding_columns=["id"],
pk_columns=["id", "someNumber1", "someNumber2"])
index_spec = IndexSpecification(index_name="TestTableIndex",
table_schema=table_schema,
equal_columns=["id", "someNumber1"],
sort_columns=[SortSpecification("someNumber2", ColumnOrder.ASCENDING_NULLS_LAST)])
table_prop = {"ibm.eventstore.ttl.hours": "2"}
ctx.create_table_with_index(table_schema, index_spec, table_prop)
- Scala
-
val testSchema = TableSchema("TestTable2",
StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("someNumber", IntegerType, nullable = false),
StructField("someString", StringType, nullable = false),
StructField("someBool", BooleanType, nullable = false),
StructField("someNullableBool", BooleanType, nullable = true))),
shardingColumns = Seq("id"),
pkColumns = Seq("id", "someNumber"))
val testIndex = IndexSpecification("TestTableIndex ", testSchema, equalColumns = Seq("id"),
sortColumns = Seq(SortSpecification("someNumber", ColumnOrder.AscendingNullsFirst)))
val tableProperties = new HashMap[String, String](Map("ibm.eventstore.ttl.hours"-> "2"))
val result = oltpContext.createTableWithIndex(testSchema, testIndex, tableProperties)
- To create a table with an index specification that includes equality, sort, and include columns:
- Python
-
schema = StructType([
StructField("id", LongType, nullable = False),
StructField("someNumber1", IntegerType, nullable = False),
StructField("someNumber2", IntegerType, nullable = False),
StructField("someString", StringType, nullable = False),
StructField("someBool", BooleanType, nullable = False),
StructField("someNullableBool", BooleanType, nullable = True)
])
table_schema = TableSchema("TestTable3", schema,
sharding_columns=["id"],
pk_columns=["id", "someNumber1"])
index_spec = IndexSpecification(index_name="TestTableIndex",
table_schema=table_schema,
equal_columns=["id"],
sort_columns=[SortSpecification("someNumber1", ColumnOrder.ASCENDING_NULLS_LAST)],
include_columns=["someNumber2"])
table_prop = {"ibm.eventstore.ttl.hours": "2"}
ctx.create_table_with_index(table_schema, index_spec, table_prop)
- Scala
-
val testSchema = TableSchema("TestTable3",
StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("someNumber1", IntegerType, nullable = false),
StructField("someNumber2", IntegerType, nullable = false),
StructField("someString", StringType, nullable = false),
StructField("someBool", BooleanType, nullable = false),
StructField("someNullableBool", BooleanType, nullable = true))),
shardingColumns = Seq("id"),
pkColumns = Seq("id", "someNumber1"))
val testIndex = IndexSpecification("TestTableIndex ", testSchema, equalColumns = Seq("id"),
sortColumns = Seq(SortSpecification("someNumber1", ColumnOrder.DescendingNullsLast)),
includeColumns = Seq("someNumber2"))
val tableProperties = new HashMap[String, String](Map("ibm.eventstore.ttl.hours"-> "2"))
val result = oltpContext.createTableWithIndex(testSchema, testIndex, tableProperties)
- Rules for creating tables
- Keep the following rules in mind when you create a table
- Naming tables
- The table name is also used as a directory name on the Linux file system and must conform to the
following rules:
Note: If a schema name is not explicitly in the TableSchema, the default will use what is set
by ConfigurationReader.setEventSchemaName. If that is not set, the default schema name for the table
will use ConfigurationReader.setEventUser.
- Data types in tables
- See Supported Spark data types for the list of data types that you can use in your
IBM
Db2 Event Store table.
- Columns
- The table can have an unlimited number of columns.
Column names have the same naming
restrictions as tables.
- Primary keys and sharding keys
- Your table must contain 1 primary key and 1 sharding key.
In addition, ensure that the
following requirements are met:
- The columns of the primary key cannot be null.
- The primary key columns (and therefore the sharding key columns) cannot be
String type.
- The columns in the sharding key must be contained by the columns in the primary key.
Additionally, if you specify a table with an index, the equality columns in the index must contain
the sharding key columns.
- Index columns
- When you define your index columns, ensure that the following requirements are met:
- Index columns cannot be
String type.
- When you specify an index, you must include equality columns. You can optionally specify include
columns and sort columns after your equality columns.
- The union of equality and sort index columns must be the same as the columns in the primary
key.
Obtain a resolved table schema
To insert data, you must have a reference to a resolved table schema. A resolved schema contains
additional table metadata that is maintained and used by the IBM
Db2 Event Store engine.
The ResolvedTableSchema reference can be obtained by using the
getTable() method for a specific table name.
To obtain a resolved schema reference, you must import the
com.ibm.event.catalog.ResolvedTableSchema package.
def getTable(name: String, schemaName: String): ResolvedTableSchema
- Arguments
-
- name
- The name of the table to resolve.
- SchemaName (optional)
- Defines the db2 table's schema qualifier name. When it is not used it defaults to what was set
in ConfigurationReader.setEventSchema and if that is not set it will default to
the userid from ConfigurationReader.setEventUser.
- Returns
- A reference to the ResolvedTableSchema of the specified table name.
If the
specified table name does not exist, an exception is thrown.
- Example
-
- Python
-
resolved_table_schema = ctx.get_table("TestTable")
- Scala
-
val testTable = eventContext.getTable("TestTable")
Obtain a list of the tables in the database
If you need to know the number of tables or the names of the tables including the schema name
that already exist in the database, for example, DB2INST1.TAB1, use the
getNamesOfTables() method of the EventContext instance.
def getNamesOfTables() : Iterator[String]
- Arguments
- None.
- Returns
- A reference that you can use to iterate through the names of the tables including the schema
name in the current database, for example:
DB2INST1.TAB1.
- Example
-
- Python
-
table_names = ctx.get_names_of_tables()
- Scala
-
val eventContext = EventContext.getEventContext("EVENTDB")
val tables : Iterator[String] = eventContext.getNamesOfTables
Obtain a list of tables and their schema
Alternatively, if you also need to know the name and schema of each table in the database, use
the getTableNamesAndSchemas() method of the EventContext
instance.
def getTableNamesAndSchemas() : Iterator[(String, ResolvedTableSchema)]
- Arguments
- None.
- Returns
- A reference that you can use to iterate through the names of the tables including the schema
name, for example,
DB2INST1.TAB1 and the resolved schema. The table names and
resolved schema are returned as pairs.
- Example
-
- Python
-
event_context = EventContext.get_event_context("EVENTDB")
table_schema_list = event_context.get_table_names_and_schemas()
- Scala
-
val eventContext = EventContext.getEventContext("EVENTDB")
val tableSchemaList : Iterator[(String, ResolvedTableSchema)] = eventContext.getTableNamesAndSchemas
Insert rows into a table
You can insert rows into a table as either single row or as a batch of rows by using the resolved
table schema. Batch inserts are always performed asynchronously. Single row inserts can be performed
synchronously or asynchronously.
Batch inserts
For optimal performance, it is recommended that you collect rows in a batch and insert them
together. Batch inserts are performed asynchronously by using the
batchInsertAsync method of the EventContext instance.
Restriction: You can transfer a maximum of 32 MB of data in a batch. Use the following
information to determine the maximum batch size as a number of rows:
- To calculate the size of the header, use the following formula:
headerSize = 36
bytes + ((numColumns + 2) * 10)
- To calculate the size of a row: use the following formula:
rowSize = Sum of
dataTypeSizes
The following table lists the size of each data type in
bytes:
| Data type |
Size |
| Boolean |
1 byte |
| Byte |
1 byte |
| Date |
4 bytes |
| Double |
8 bytes |
| Float |
4 bytes |
| Integer |
4 bytes |
| Long |
8 bytes |
| Short |
2 bytes |
| String |
16 bytes + the length of the string contents |
| Timestamp |
8 bytes |
For
example, you have the following schema:
Timestamp <Long>
If each
row's map has 10 entries, your calculations would be:
- headerSize = 36 + 2 * 10 = 56
- rowSize = 8 + 8 = 16
To insert a batch, complete the following steps:
- Define the rows to be inserted in a batch.
When you insert rows to IBM
Db2 Event Store in a batch, the rows must be supplied using an
IndexSeq[Row] object, where Row is a Spark SQL row object that
matches the StructType of the TableSchema object.
Important: All of the columns in a row must have a value or be equal to null if the column is
nullable.
- Example
-
- Python
-
row_batch = []
row_batch.append(dict(id=1, someNumber=1, someString="Hello", someBool=true, someNullableBool=false))
row_batch.append(dict(id=2, someNumber=2, someString="Hello", someBool=false, someNullableBool=true))
row_batch.append(dict(id=3, someNumber=3, someString="Hello", someBool=true, someNullableBool=true))
- Scala
-
val rows = IndexedSeq(Row(1L, 1, "Hello", true, false),
Row(1L, 2, "Hello", false, null),
Row(2L, 3, "Hello", true, true))
- Insert the batch asynchronously.
Use the batchInsertAsync method to insert
rows to IBM
Db2 Event Store asynchronously in batches. This
method sends a batch of rows from the client to the IBM
Db2 Event Store cluster. IBM
Db2 Event Store provides a response immediately.
If the
application needs to know the status (result) of the batch insert request, the
information is provided in the returned Future object.
To use the
batchInsertAsync, you must import the
com.ibm.event.oltp.InsertResult package.
def batchInsertAsync(schema: ResolvedTableSchema,
rows: IndexedSeq[Row]): Future[InsertResult]
- Arguments
-
- schema
- The resolved table schema that describes the table into which the batch is inserted.
- rows
- The sequence of the rows that are to be inserted.
- Returns
- A Future object that evaluates the status (result) of the
batch insert operation.
An exception is returned if the supplied rows do not match the specified
schema.
- Example
-
- Python
-
Restriction: Python does not support the Future object.
ctx.batch_insert(resolved_table_schema, row_batch)
In Python, None is used to represent null values. If you would like to add null values for
Nullable columns on a Python batch_insert, make sure you are using None.
- Scala
-
val future = eventContext.batchInsertAsync(schema, rows)
val result = Await.result(Future, Duration.Inf)
if (result.failed) {
// batch insert failed
}
If the operation fails (that is, if at least one shard has unprocessed rows) then
result.failed equals true. The
result.toString method returns a string with any error messages and the ID of
each shard for which the insert operation failed. The application can then decide whether to:
- Ignore the failure
- Retry only the failed shards by calling the result.retryFailed() method, as
shown in the following example.
- Python
Restriction: Python does not support the Future object or the
result.retry.Failed() method.
If the batch insertion fails,
use:try:
ctx.batch_insert(resolved_table_schema, row_batch)
except Exception as e:
print("Batch Insert Exception is - " + str(e) + " \n")
- Scala
-
val future = eventContext.batchInsertAsync(schema, rows)
var result = Await.result(Future, Duration.Inf)
while (result.failed) {
val future = result.retryFailed()
result = Await.result(Future, Duration.Inf)
}
Note: The asynchronous insert API automatically attempts to resolve errors that are caused by
communication issues or node failures. Each connection attempt timeout is controlled by the value
that is set for the
ConfigurationReader.setConnectionTimeout() method. You can
improve the performance of your application by setting the timeout to 1
minute:
ConfigurationReader.setConnectionTimeout(1)
Insert a single row asynchronously
You can insert a single row into an IBM
Db2 Event Store
table asynchronously by using the insertAsync method.
EventContext.insertAsync(schema: ResolvedTableSchema, row:Row): Future[InsertResult]
Tip: Inserting a single row gives sub-optimal performance
compared to inserting a batch of rows. To achieve the best performance on insert operations, use the
asynchronous batch insert API.
- Arguments
-
- schema
- The resolved table schema that describes the table into which the row is inserted.
- row
- The single row that is to be inserted.
- Returns
- A Future object that evaluates the status (result) of the
insert operation.
An exception is thrown if the supplied row do not match the specified
schema.
- Example
-
- Python
- This method is not currently supported for Python.
- Scala
-
val row = Row(1L, 1, "Hello", true)
val Future = eventContext.insertAsync(schema, row)
val result = Await.result(f, Duration.Inf)
if (result.failed) {
// Insert failed
}
Insert a single row synchronously
You can insert a single row into an IBM
Db2 Event Store
table by using the insert method.
EventContext.insert(schema: ResolvedTableSchema, row:Row): InsertResult
Tip: Inserting a single row gives sub-optimal performance
compared to inserting a batch of rows. To achieve the best performance on insert operations, use the
asynchronous batch insert API.
- Arguments
-
- schema
- The resolved table schema that describes the table into which the row is inserted.
- row
- The single row that is to be inserted.
- Returns
- An InsertResult object.
- Example
-
- Python
- This method is not currently supported for Python.
- Scala
-
val row = Row(1L, 1, "Hello", true)
val result = eventContext.insert(schema, row)
if (result.failed)
println(s"single row insert failed: $result")
else
println(s"Row $i successfully inserted into ${schema.tableName}")
Drop a table
You can drop Db2 Event Store tables by using the
dropTable method.
def dropTable(tableName: String, schemaName: String = "")
Restriction: This command cannot complete if any of the nodes on which the
database is defined are inactive. If you notice that this command failed because one or more nodes
are inactive, wait for the nodes to be automatically restarted before you run this command again.
- Arguments
-
- name
- The name of the table to drop.
- schemaName (optional)
- The db2 table's schema name. The default value is the db2 username.
- Returns
- Some error messages from the EventError class if the
request fails. Otherwise,
None is returned.
- Example
-
- Python
-
with EventContext.get_event_context("EVENTDB") as ctx:
ctx.drop_table("ReviewTable")
- Scala
-
val error = eventContext.dropTable("TestTable")
error.map(e => sys.error(e.toString)) // Example of exposing return value
Drop a database
You cannot remove a database from a Db2 Event Store
cluster using the Client API. To remove a database, see Manage and monitor a database.
Clean up the EventContext singleton class
You can use the EventContext.cleanUp() method to close any existing
connections to the database in the client connection pool. You should close existing connections to
the database so that your application can complete. If you run your application without closing
connection threads, the application cannot complete.
- Python
-
def clean_up:
- Scala
-
def cleanUp() : Unit
- Arguments
- None.
- Returns
- None.
- Example
-
- Python
-
EventContext.cleanUp()
- Scala
-
EventContext.clean_up()