Creating Spark applications by using the IDAX datasource

Typically, Spark applications read data from the database, process this data in a highly parallel manner in the Spark cluster, and then write the results back to the database.

To use the colocated data read, use the IDAX datasource in your Spark application.

Name and description of the IDAX datasource parameters

The following table shows the name and description of the general parameters.

Table 1. Name and description of the general parameters
Name of the general parameter Description Notes
datasource name
For R, specify:
source="com.ibm.idax.spark.idaxsource"
For Python, Scala, and Java, specify:
.format("com.ibm.idax.spark.idaxsource")

Mandatory.

None.
dbtable
Table name, optionally with schema:
[schema.]table

Mandatory.

Only table identifiers are supported, SQL subqueries, which are allowed with plain JDBC, are not supported. Use the sqlPredicate parameter instead.
url The JDBC URL, for example:
jdbc:db2://host.company.com:50001/BLUDB:sslConnection=true;
Optional. Default: Type 2 JDBC:
jdbc:db2:BLUDB
If you use a URL format of Type 4 JDBC, the user parameter and the password parameter are mandatory.
user The user ID for accessing the database.

Optional for Type 2 JDBC connections.

If you do not specify this parameter, the database is accessed with the user ID that is used to submit the Spark job.
password The password for the specified user.

Optional for Type 2 JDBC connections.

None.

The following table shows the name and description of the additional parameters for the data read.

Table 2. Name and description of the additional parameters for the data read
Name of the additional parameters for data read Description Notes
sqlPredicate SQL filter expression, for example, ID>2 AND ID<10

Optional.

None.
colocated Optional. Default: TRUE If you set this parameter to TRUE, the data is read locally. Reading the data locally improves the performance of the data read process. The reason is that each Spark node reads only the portion of the data of the database table that is stored in a database partition on the same host.

If you set this parameter to FALSE, all the data is read through the head node. This setting is required, when you access data on a remote Db2® cluster, for example, for the development of unit tests.

The following table shows the name and description of the additional parameters for the data write.

Table 3. Name and description of the parameters
Name of the additional parameters for data write Description Notes
allowAppend To use the Spark save mode SaveMode.Append, you must set this parameter to TRUE.

Optional. Default: FALSE

If the amount of the appended data is large, or if the source data frame is partitioned, the corresponding append operation might involve several transactions. If an error occurs during one of these transactions, the transaction, in which the error occurred, is rolled back, but previously committed transactions remain unaffected. The result might then be a partial completion of the overall append operation.
truncateTable Optional. Default: FALSE If you use the SaveMode.Overwrite mode, and if the output table exists, you can set the truncateTable parameter to TRUE. The table is then truncated instead of being dropped and re-created. This approach can be helpful if the output table has specific properties, for example, it is declared volatile.
batchSize The number of rows in a batch.

Optional. Default. 1000

Change the default values only if the problem of a lock escalation occurs.

Data is written in batches with the specified size, and the transaction is committed every n batches.

If a transaction becomes too large during parallel inserts, one of the inserters runs into a lock escalation and gets an exclusive write lock for the table. The transactions of the other writers are starved and eventually fail with SQLCODE -911.

maxBatchesInTransaction The number of batches per transaction.

Optional. Default: 10

maxInsertThreadsPerNode The number of parallel threads per node that are used to insert data into the database.

Optional. Default: 8 or the number of available processors divided by 4. The larger number of both is taken.

This value affects the number of spark shuffle partitions that are created for the spark task. If there are more spark partitions than threads, the number of partitions on a cluster node is reduced to the number of insert threads. If this setting is too high, Db2 might run into exceptions.

Examples for data reads from a table

Specify the following datasource formats and options for your data frame reader depending on which programming language you are using.

  • For Python, Scala, and Java:
    df = sparkSession.read \
        .format("com.ibm.idax.spark.idaxsource") \
        .options(dbtable="BLUADMIN.IRIS", sqlPredicate="ID>2 AND ID<10") \
        .load()
  • For R:
    sql.df <- SparkR::read.df("BLUADMIN.IRIS",
                              source="com.ibm.idax.spark.idaxsource", 
                              dbtable="BLUADMIN.IRIS",
                              sqlPredicate="ID>2 AND ID<10")

Examples for data writes from a table

Specify the following datasource format, option, and mode for your data frame writer depending on which programming language you are using.

In the following examples, it is assumed that output is a SparkR dataframe, and <SparkSaveMode> is one of the Spark save modes. For more information, see Safe Modes, and additionally for Python, see pyspark.sql module.

  • For Python, Scala, and Java:
    output.write \
          .format("com.ibm.idax.spark.idaxsource") \
          .option("dbtable","MY_OUTPUT_TABLE") \
          .mode(<SparkSaveMode>) \
          .save()
  • For R:
    write.df(output,
             "MY_OUTPUT_TABLE",
             source = "com.ibm.idax.spark.idaxsource",
             mode = <SparkSaveMode>,
             dbtable = "MY_OUTPUT_TABLE")
  • If the data is to be appended to an existing table, you must set the allowAppend option to true:
    • For Scala and Java:
      .option("allowAppend","true")  
      .mode(SaveMode.Append)
    • For Python:
      .option("allowAppend","true")
      .mode("append")
    • For R:
      mode = "append",
      allowAppend = "true"
For examples of Spark application code and how to modify it, see Examples of Spark applications. For example:
  • The sample application SqlPredicateExample shows how an application can push an SQL predicate down into the database. This action improves the performance because only a subset of the data needs to be fetched.
  • The sample application ExceptionExample shows how to code an application so that exceptions are recorded in the file $HOME/spark/log/submission_id/submission.info.

Limitations and restrictions

A Spark application that uses Db2 Warehouse database tables is subject to the following limitations and restrictions:
  • Do not customize Apache log4j logging in your Spark application code or by packaging a log4j configuration file with the application JAR file. The Spark integration requires that log output for org.apache.spark packages at log level INFO is written to the standard output of the application.
  • Database columns that are accessed by Spark must have one of the following data types:
    • CHAR
    • VARCHAR
    • SMALLINT
    • INTEGER
    • BIGINT
    • FLOAT
    • DOUBLE
    • REAL
    • DECIMAL
    • NUMERIC
    • DATE
    • TIMESTAMP
  • The precision of a TIMESTAMP variable can be at most six decimal places (microsecond precision). Additional decimal places are truncated after reading from or before writing to the database.
  • A Db2 Warehouse view cannot be used as a source of input for an application.
  • If a Spark application encounters an error while data is written to the Db2 Warehouse database tables, a Spark retry is stopped to avoid duplicate entries in the database table. If a Spark application encounters an error while data is read from the Db2 Warehouse database tables, the Spark retry works correctly.