Creating Spark applications by using the IDAX datasource
Typically, Spark applications read data from the database, process this data in a highly parallel manner in the Spark cluster, and then write the results back to the database.
To use the colocated data read, use the IDAX datasource in your Spark application.
Name and description of the IDAX datasource parameters
The following table shows the name and description of the general parameters.
| Name of the general parameter | Description | Notes |
|---|---|---|
| datasource name |
For R, specify:
For Python, Scala, and Java,
specify:
Mandatory. |
None. |
| dbtable |
Table name, optionally with
schema:
Mandatory. |
Only table identifiers are supported, SQL subqueries, which are allowed with plain JDBC, are not supported. Use the sqlPredicate parameter instead. |
| url | The JDBC URL, for
example:Optional.
Default: Type 2 JDBC:
|
If you use a URL format of Type 4 JDBC, the user parameter and the password parameter are mandatory. |
| user | The user ID for accessing the database. Optional for Type 2 JDBC connections. |
If you do not specify this parameter, the database is accessed with the user ID that is used to submit the Spark job. |
| password | The password for the specified user. Optional for Type 2 JDBC connections. |
None. |
The following table shows the name and description of the additional parameters for the data read.
| Name of the additional parameters for data read | Description | Notes |
|---|---|---|
| sqlPredicate | SQL filter expression, for example, ID>2 AND
ID<10 Optional. |
None. |
| colocated | Optional. Default: TRUE | If you set this parameter to TRUE, the data is read locally. Reading the data locally
improves the performance of the data read process. The reason is that each Spark node reads only the
portion of the data of the database table that is stored in a database partition on the same
host. If you set this parameter to FALSE, all the data is read through the head node. This setting is required, when you access data on a remote Db2® cluster, for example, for the development of unit tests. |
The following table shows the name and description of the additional parameters for the data write.
| Name of the additional parameters for data write | Description | Notes |
|---|---|---|
| allowAppend | To use the Spark save mode SaveMode.Append, you must set this parameter
to TRUE. Optional. Default: FALSE |
If the amount of the appended data is large, or if the source data frame is partitioned, the corresponding append operation might involve several transactions. If an error occurs during one of these transactions, the transaction, in which the error occurred, is rolled back, but previously committed transactions remain unaffected. The result might then be a partial completion of the overall append operation. |
| truncateTable | Optional. Default: FALSE | If you use the SaveMode.Overwrite mode, and if the output table exists, you can set the truncateTable parameter to TRUE. The table is then truncated instead of being dropped and re-created. This approach can be helpful if the output table has specific properties, for example, it is declared volatile. |
| batchSize | The number of rows in a batch. Optional. Default. 1000 |
Change the default values only if the problem of a lock escalation
occurs. Data is written in batches with the specified size, and the transaction is committed every n batches. If a transaction becomes too large during parallel inserts, one of the inserters runs into a lock escalation and gets an exclusive write lock for the table. The transactions of the other writers are starved and eventually fail with SQLCODE -911. |
| maxBatchesInTransaction | The number of batches per transaction. Optional. Default: 10 |
|
| maxInsertThreadsPerNode | The number of parallel threads per node that are used to insert data into the database.
Optional. Default: 8 or the number of available processors divided by 4. The larger number of both is taken. |
This value affects the number of spark shuffle partitions that are created for the spark task. If there are more spark partitions than threads, the number of partitions on a cluster node is reduced to the number of insert threads. If this setting is too high, Db2 might run into exceptions. |
Examples for data reads from a table
Specify the following datasource formats and options for your data frame reader depending on which programming language you are using.
- For Python, Scala, and
Java:
df = sparkSession.read \ .format("com.ibm.idax.spark.idaxsource") \ .options(dbtable="BLUADMIN.IRIS", sqlPredicate="ID>2 AND ID<10") \ .load() - For
R:
sql.df <- SparkR::read.df("BLUADMIN.IRIS", source="com.ibm.idax.spark.idaxsource", dbtable="BLUADMIN.IRIS", sqlPredicate="ID>2 AND ID<10")
Examples for data writes from a table
Specify the following datasource format, option, and mode for your data frame writer depending on which programming language you are using.
In the following examples, it is assumed that output is a SparkR dataframe, and <SparkSaveMode> is one of the Spark save modes. For more information, see Safe Modes, and additionally for Python, see pyspark.sql module.
- For Python, Scala, and
Java:
output.write \ .format("com.ibm.idax.spark.idaxsource") \ .option("dbtable","MY_OUTPUT_TABLE") \ .mode(<SparkSaveMode>) \ .save() - For
R:
write.df(output, "MY_OUTPUT_TABLE", source = "com.ibm.idax.spark.idaxsource", mode = <SparkSaveMode>, dbtable = "MY_OUTPUT_TABLE") - If the data is to be appended to an existing table, you must set the
allowAppend option to true:
- For Scala and
Java:
.option("allowAppend","true") .mode(SaveMode.Append) - For Python:
.option("allowAppend","true") .mode("append") - For R:
mode = "append", allowAppend = "true"
- For Scala and
Java:
- The sample application SqlPredicateExample shows how an application can push an SQL predicate down into the database. This action improves the performance because only a subset of the data needs to be fetched.
- The sample application ExceptionExample shows how to code an application so that exceptions are
recorded in the file
$HOME/spark/log/submission_id/submission.info.
Limitations and restrictions
- Do not customize Apache log4j logging in your Spark application code or by packaging a log4j configuration file with the application JAR file. The Spark integration requires that log output for org.apache.spark packages at log level INFO is written to the standard output of the application.
- Database columns that are accessed by Spark must have one of the following data types:
- CHAR
- VARCHAR
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- REAL
- DECIMAL
- NUMERIC
- DATE
- TIMESTAMP
- The precision of a TIMESTAMP variable can be at most six decimal places (microsecond precision). Additional decimal places are truncated after reading from or before writing to the database.
- A Db2 Warehouse view cannot be used as a source of input for an application.
- If a Spark application encounters an error while data is written to the Db2 Warehouse database tables, a Spark retry is stopped to avoid duplicate entries in the database table. If a Spark application encounters an error while data is read from the Db2 Warehouse database tables, the Spark retry works correctly.