Create a session to run queries in IBM
Db2 Event Store
To run Spark SQL queries, you must set up a Db2 Event Store Spark session.
The EventSession class extends the optimizer of the
SparkSession class by enabling the optimizer to:
- Handle IBM
Db2 Event Store tables.
- Route subplan execution to the IBM
Db2 Event Store
engine.
- Return rows from these IBM
Db2 Event Store subplans to be
processed by the rest of the plan executed by the Spark SQL query
class EventSession (
sc: SparkContext,
val dbName: String)
- Arguments
-
- sc
- The SparkContext reference (the Spark context used in Spark).
- dbName
- The name of the IBM
Db2 Event Store database that you are
connecting to.
Note: The database name is always case insensitive and only the first 8 characters are
used following the same properties and DB2 database allowed names.
- Example
-
- Python
-
from eventstore.sql import EventSession
from eventstore.common import ConfigurationReader
ConfigurationReader.setConnectionEndpoints("HostIP:Port")
sparkSession = SparkSession.builder.appName("EventStore in Python").getOrCreate()
eventSession = EventSession(sparkSession.sparkContext, "EVENTDB")
- Scala
-
import org.apache.spark.sql.ibm.event.EventSession
import com.ibm.event.common.ConfigurationReader
ConfigurationReader.setConnectionEndpoints("host:IP")
val sparkSession = SparkSession.builder()
.master(Option(System.getenv("MASTER")).getOrElse("local[3]"))
.appName("EventStore OLAP Test")
.getOrCreate()
val esSession = new EventSession(sparkSession.sparkContext, "EventDB")
Open an existing database
You must open the database before you can use it. To open an existing database in an IBM
Db2 Event Store session, use the openDatabase
method on the EventSession class.
EventSession.openDatabase()
This operation creates a connection to the IBM
Db2 Event Store system and opens the database that you specified
when you created the Db2 Event Store Spark session.
- Example
-
- Python
-
eventSession.open_database()
- Scala
-
esSession.openDatabase()
Load a table and get the DataFrame reference
The loadEventTable method provides the DataFrame reference
for the specified table in IBM
Db2 Event Store.
You can use the DataFrame reference to define a temporary view by using the
table.createOrReplaceTempView method. You can then use the view name in Spark SQL
statements to manipulate, use, and retrieve rows from a query.
EventSession.loadEventTable(table: String, tableSchema: String): DataFrame
- Arguments
-
- table
- The name of a table in the IBM
Db2 Event Store
database.
- tableSchema [optional]
- The Schema name qualifier for the table that exists in the catalogs. If this parameter is not
used then it defaults to what was used in
ConfigurationReader.setEventSchema(schemaName:String), and if that was not set, the
default schema name defaults to the userid (that is, the one used in
ConfigurationReader.setEventUser).
If the table name and tableSchema are not
delimited with single quotes then the strings are considered to be case insensitive. If a string for
table or for tableSchema is delimited by single quotes then the string is case sensitive. For
example, if the table name used is: "'Table1Name'", then the case of the name is
not changed.
Overall in all of the client APIs for OLAP and OLTP for table names, table
schemas, column names, and index names, you can use single quoted strings to keep identifiers as
case sensitive.
The database name is always case insensitive.
- Returns
- The DataFrame reference for the IBM
Db2 Event Store database.
- Example
-
- Python
-
table = eventSession.load_event_table("ReviewTable")
table.createOrReplaceTempView("ReviewTable")
- Scala
-
val table = esSession.loadEventTable("ReviewTable")
table.createOrReplaceTempView("ReviewTable")
Specify what data queries are run against
To quickly ingest large amounts of data, IBM
Db2 Event Store writes data to a log before it is persisted to the shared storage (where data is available to
external query engines). When data is transitioned from the log to the shared storage, an index is
created and duplicate data is removed.
After all data in the log file has been written to the shared storage, IBM
Db2 Event Store can return a consistent snapshot of
the data. However, if you are running a query on data as it is added, the consistent snapshot might
not reflect the most up-to-date data.
To strike a balance between returning data immediately and returning a consistent snapshot of the
data, you can specify the isolation level that you want to run the query against. By
default, IBM
Db2 Event Store runs queries against the
SnapshotAny isolation level. The isolation level that you specify is used for the
remainder of the session or until you specify a different value for the session.
esSession.setQueryReadOption("SnapshotAny")
- Arguments
-
- String
- Specify one of the following constant string values.
- SnapshotAny
- This is the default value.
This value returns the most recent consistent snapshot in the
shared storage without waiting for all of the data in the log to be persisted to the shared
storage.
This snapshot might not include the most up-to-date data if there is still data in
the log that needs to be persisted to the shared storage.
Note: SnapshotAny
is
translated to the JDBC isolation elev of TRANSACTION_READ_COMMITTED
.
- Returns
- None.
- Example
-
- Python
-
from eventstore.sql import EventSession
eventSession = EventSession(sparkSession.sparkContext, "EVENTDB")
eventSession.set_query_read_option("SnapshotAny")
- Scala
-
import org.apache.spark.sql.ibm.event.EventSession
val esSession = new EventSession(sparkSession.sparkContext, "EventDB")
esSession.setQueryReadOption("SnapshotAny")
Run an SQL query against IBM
Db2 Event Store using
Spark SQL
After you open the database and load the relevant table, you can run a Spark SQL query.
Note:
If you need a table's column identifiers to be case sensitive, then in the Spark SQL query string
delimit the column identifiers with single quotes. For example, select * from ReviewTable
where 'ColumnId' = 5
.
- Example
-
- Python
-
query = "SELECT * FROM ReviewTable"
result = eventSession.sql(query)
result.show(50)
- Scala
-
val results = esSession.sql("select * from ReviewTable")
results.show(50)