Using Flight with Apache Spark

If you are using Spark in your notebook, you can use the Spark data source com.ibm.connect.spark.flight to load data into a Spark data frame:

options = {
    "flight.location": <flight_service_url>,
    "flight.command": <flight_cmd>,
    "flight.timeout.default": "30S",
    "flight.authToken": <authentication_token>
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

You can define the values for options in your environment by using functions from the itc_utils library that were described in the previous sections. For example:


import itc_utils.flight_service as itcfs

MyConnection_data_request = {
      'connection_name': """MyConnection""",
      'interaction_properties': {
          'row_limit': 5000,
          'schema_name': '<schema>',
          'table_name': '<table>'
      }
 }

options = {
    "flight.location": itcfs.get_flight_service_url(),
    "flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
    "flight.timeout.default": "30S",
    "flight.authToken": itcfs.get_bearer_token()
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

For details about using the itc_utils library, see Using itc_utils with your own code.

For details of about flight requests, see Flight data requests.

Alternatively, if your connection type is supported, you can use the generated code from the Code snippets pane to generate code for the Flight data source. The following example shows loading from a Db2 Warehouse connection. Note that the read options are specified one by one. This is equivalent to passing a dictionary, as in .options(**options) for example.

import itc_utils.flight_service as itcfs

from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()

#  Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
    'connection_name': """MyConnection""",
    'interaction_properties': {
        'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
    }
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)

sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
    .option("flight.location", itcfs.get_flight_service_url()) \
    .option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
    .option("flight.authToken", itcfs.get_bearer_token()) \
    .load()
sp_df_1.show(10)

Timeouts when using Spark

In addition to the default timeout for all Flight calls, the Spark Flight data source can accept configurations for the following timeouts:

  • flight.timeout.auth: Timeout to complete client authentication
  • flight.timeout.get: Timeout to complete a DoGet call to read a Flight stream
  • flight.timeout.put: Timeout to complete a DoPut call to write a Flight stream

If any of these timeouts are not set, the timeout will fall back to the value set in flight.timeout.default. If this value is also not set, there will be no timeout.

You can enter the string values for these timeouts in seconds, for example "30S", in minutes for example "5M", or in hours for example "1H".

Learn more

Parent topic: Accessing data sources with Flight service