Using Flight with Apache Spark
If you are using Spark in your notebook, you can use the Spark data source com.ibm.connect.spark.flight
to load data into a Spark data frame:
options = {
"flight.location": <flight_service_url>,
"flight.command": <flight_cmd>,
"flight.timeout.default": "30S",
"flight.authToken": <authentication_token>
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
You can define the values for options in your environment by using functions from the itc_utils
library that were described in the previous sections. For example:
import itc_utils.flight_service as itcfs
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
options = {
"flight.location": itcfs.get_flight_service_url(),
"flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
"flight.timeout.default": "30S",
"flight.authToken": itcfs.get_bearer_token()
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
For details about using the itc_utils
library, see Using itc_utils with your own code.
For details of about flight requests, see Flight data requests.
Alternatively, if your connection type is supported, you can use the generated code from the Code snippets pane to generate code for the Flight data source. The following example shows loading from a Db2 Warehouse connection. Note that the read
options are specified one by one. This is equivalent to passing a dictionary, as in .options(**options)
for example.
import itc_utils.flight_service as itcfs
from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()
# Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
}
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)
sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
.option("flight.location", itcfs.get_flight_service_url()) \
.option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
.option("flight.authToken", itcfs.get_bearer_token()) \
.load()
sp_df_1.show(10)
Timeouts when using Spark
In addition to the default timeout for all Flight calls, the Spark Flight data source can accept configurations for the following timeouts:
flight.timeout.auth
: Timeout to complete client authenticationflight.timeout.get
: Timeout to complete a DoGet call to read a Flight streamflight.timeout.put
: Timeout to complete a DoPut call to write a Flight stream
If any of these timeouts are not set, the timeout will fall back to the value set in flight.timeout.default
. If this value is also not set, there will be no timeout.
You can enter the string values for these timeouts in seconds, for example "30S", in minutes for example "5M", or in hours for example "1H".
Learn more
Parent topic: Accessing data sources with Flight service