Flight service in Python notebooks
In IBM Cloud Pak for Data, you can use the Flight Service and the Apache Arrow Flight protocol to read from and write data to data assets in a project or space. These data assets can be files in the storage associated with your current project or space, or data accessed through a database connection.
If you want to write your own code to read from and write data to data assets in a project, you can use the open source pyarrow
library to invoke the Flight service.
If you use the code that is generated for you to load data from a file or a connection from the Code snippets pane in a notebook, the code also uses pyarrow
to invoke the Flight service, as well as the itc_utils
library,
which wraps calls to the open source pyarrow
library, improving code readablity while reducing code size with minimal additional programming effort.
Basic Flight service interaction
The following code snippets illustrate the basic interactions with the Flight service to read from and write data to a data source. This section also includes a code snippet that shows you how to authenticate with the Flight Server.
Reading data
The steps to read data from a data source include:
- Creating a flight descriptor with the metadata to access a data source
- Creating an instance of a flight client
- Authenticating with the Flight service
- Sending the flight descriptor to the Flight service to obtain a flight info object
- Reading data from a data source. The code snippet shows reading to a
pyarrow.Table
, apandas.DataFrame
and how to read data in chunks.
Explanatory sample code snippet to read data. If you are working with large data sets, see Best practices when loading large volumes of data from a file or connection.
# create a flight descriptor specifiying the data source or target
# content and structure of cmd are specific to IBM CP4D's flight service.
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)
# create an instance of a flight client
flightClient = pyarrow.flight.FlightClient(url, **opts)
# authenticate with the flight service (for authHandler, see code snippet at the end of this section).
flightClient.authenticate(authHandler)
# send the flight descriptor the flight service to obtain a FlightInfo object
# which provides information for reading data from one or more endpoints.
flightInfo = flightClient.get_flight_info(flightDescriptor)
# read from all endpoints
for endpoint in flightInfo.endpoints:
reader = flightClient.do_get(endpoint.ticket)
# from an endpoint (or reader, or stream), you can read in several ways:
# 1) read a pyarrow.Table
table = reader.read_all()
# 2) read a pandas.DataFrame
df = reader.read_pandas()
# 3) read in chunks, i.e. a number of pyarrow.flight.RecordBatch
while True:
try:
recordBatch = reader.read_chunk() # read a pyarrow.flight.RecordBatch
except StopIteration:
break
Writing data
The steps to write data to a data source include:
- Creating a flight descriptor with the metadata to access a data source
- Creating an instance of a flight client
- Authenticating with the Flight service
- Obtaining a flight write stream
- Writing data to a data target
Sample code snippet to write data:
import pyarrow as pa
# create a flight descriptor specifiying the data source or target
# content and structure of cmd are specific to IBM CP4D's flight service.
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)
# create an instance of a flight client
flightClient = pyarrow.flight.FlightClient(url, **opts)
# authenticate with the flight service (for authHandler, see code snippet at the end of this section).
flightClient.authenticate(authHandler)
# obtain a flight write stream
schema = pa.Schema.from_pandas(df, preserve_index=False)
writer, reader = flightClient.do_put(flightDescriptor, schema)
# write data to a data target
writer.write_table(pa.Table.from_pandas(df, schema))
writer.close()
Flight descriptor
A central piece of interaction with the Flight service is the flight descriptor which specifies the access to the data source. It includes a data source specification in the form of connection properties, such as host, port, and so on, and interaction properties, such as a table name, or SQL statement. Instead of connection properties, you can also specify the IDs of the asset and of the project or deployment space.
Technically, the Flight service expects a flight descriptor in the form of a JSON string. In Python notebooks, you can use Python dictionaries to construct a flight descriptor:
flight_request = {
"asset_id": "<asset_id>",
"project_id": "<project_id>",
"interaction_properties": {
"schema_name": "<schema>",
"table_name": "<table>",
"row_limit": 5000
}
}
# create a flight descriptor
cmd = json.dumps(flight_request)
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)
The asset_id
can be the ID of a connected data asset or a connection asset.
Flight descriptors for reading or writing data only differ slightly with regards to the interaction_properties
. Read requests typically have interaction properties like 'sql_statement', 'file_name', or 'table_name', while write
requests have additional interaction properties, such as 'existing_table_action' or 'file_format'.
For details about the data request syntax, see Flight data requests.
Authenticating with the Flight Server
You need to authenticate with the Flight service with a valid bearer token. For that purpose, you can use the following code snippet to write a custom authentication handler class and create an instance of this class.
import pyarrow.flight as flight
class TokenClientAuthHandler(flight.ClientAuthHandler):
"""An example implementation of authentication with a user token."""
def __init__(self, token):
super().__init__()
strToken = str(token)
self.token = strToken.encode('utf-8')
def authenticate(self, outgoing, incoming):
outgoing.write(self.token)
self.token = incoming.read()
def get_token(self):
return self.token
# create an instance of the authentication handler by using IBM Watson Studio Lib
from ibm_watson_studio_lib import access_project_or_space
wslib = access_project_or_space()
token = 'Bearer {}'.format(wslib.auth.get_current_token())
authHandler = TokenClientAuthHandler(token)
Using generated code for the Flight service
When you use the code that is generated for you to load data from a file or a connection from the Code snippets pane in a notebook, the generated code uses pyarrow
to invoke the Flight service. However, the function calls to pyarrow
are not visible because they are wrapped in higher level functions provided by another library called itc_utils
.
This itc_utils
library is pre-installed in all notebook runtime environments provided by IBM for the purpose of reducing the code size and making the code more readable. To achieve this goal, the itc_utils
library leverages
information from the runtime environment and from ibm_watson_studio_lib
library.
Another significant advantage of the generated code is that the data request has special properties, namely 'connection_name', 'connected_data_name', or 'data_name', depending on the kind of asset for which you generate the code. itc_utils
converts these properties into an asset_id
, plus a project_id
or space_id
, before creating a flight descriptor.
Example of code that is generated for you to load data:
import itc_utils.flight_service as itcfs
readClient = itcfs.get_flight_client()
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
flightInfo = itcfs.get_flight_info(readClient, nb_data_request=MyConnection_data_request)
data_df_1 = itcfs.read_pandas_and_concat(readClient, flightInfo, timeout=240)
data_df_1.head(10)
Various functions of the itc_utils
library are described in the following section.
Using itc_utils
with your own code
You can use the functions provided by itc_utils
to extend code that uses the pyarrow
library. Because itc_utils
is based on pyarrow
, you can pick specific functions of itc_utils
and combine them with your pyarrow
code.
The itc_utils
library provides helper functions that you can use to make your programs easier to read. This library can be removed by IBM at any time, if deemed necessary, and functions can be changed without prior notice.
The following functions can simplify code development.
-
Functions for Flight descriptor creation:
- get_data_request(): transforms a Python dictionary with named assets into the request required by the Flight service
- get_flight_cmd(): serializes a Python dictionary into a JSON string
- get_flight_descriptor(): creates a ready to use pyarrow.flight.FlightDescriptor from a Python dictionary
-
Functions for client instantiation and authentication:
- get_flight_service_url(): gets the default Flight service URL
- get_flight_client(): creates a flight client and authenticates with the Flight service using the current user token of the environment
- TokenClientAuthHandler(): authenticates to the Flight service
-
Functions for reading data
- get_flight_info(): gets a
pyarrow.flight.FlightInfo
object for a Python dictionary - read_pandas_and_concat(): reads data from all endpoints and loads the data into a single
pandas.DataFrame
- get_flight_info(): gets a
get_data_request()
The function get_data_request()
transforms a Python dictionary with items connection_name
, connected_data_name
or data_name
into a Python dictionary representing a Flight service request
with items asset_id
, and project_id
or space_id
.
The result can be used as input for other itc_utils
library functions, for example get_flight_descriptor()
.
Examples:
-
With item
connection_name
:connection_data_request = { 'connection_name': """MyConnection""", 'interaction_properties': { 'row_limit': 5000, 'schema_name': '<schema>', 'table_name': '<table>' } } flightRequest = itcfs.get_data_request(nb_data_request=connection_data_request)
-
With item
connected_data_name
:connected_asset_data_request = { 'connected_data_name': """NameOfConnectedAsset""", 'interaction_properties': { #'row_limit': 5000 } } flightRequest = itcfs.get_data_request(nb_data_request=connected_asset_data_request)
-
With item
data_name
:CSV_data_request = { 'data_name': """little.csv""", 'interaction_properties': { #'row_limit': 500, 'infer_schema': 'true', 'infer_as_varchar': 'false' } } flightRequest = itcfs.get_data_request(nb_data_request=CSV_data_request)
get_flight_cmd()
The function get_flight_cmd()
transforms a Python dictionary, representing a Flight service request into a JSON string. The function is helpful in Spark notebooks because the Spark Flight Connector expects a cmd
.
You can also use this function in your own pyarrow
code to get a valid cmd
for the Flight service.
The result of the get_flight_cmd()
function can serve as input to create of a pyarrow.flight.FlightDescriptor
.
You can use this command with named parameters nb_data_request
or data_request
.
If you use nb_data_request
, the dictionary items connection_name
, connected_data_name
or data_name
are resolved to an asset_id
, and project_id
or space_id
before the dictionary is transformed into a JSON string.
If you use data_request
, the dictionary is transformed to a JSON string as is.
Examples:
-
Named parameter
nb_data_request
with dictionary itemconnection_name
:MyConnection_data_request = { 'connection_name': """MyConnection""", 'interaction_properties': { 'row_limit': 5000, 'schema_name': '<schema>', 'table_name': '<table>' } } cmd = itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request)
-
Named parameter
data_ request
:My_flight_request = { 'asset_id': '<asset_id>', 'project_id': '<project_id>', 'interaction_properties': { 'row_limit': 5000, 'schema_name': '<schema>', 'table_name': '<table>' } } cmd = itcfs.get_flight_cmd(data_request=My_flight_request)
get_flight_descriptor()
The function get_flight_descriptor()
transforms a Python dictionary, representing a Flight service request into a pyarrow.flight.FlightDescriptor
.
As described in get_flight_cmd
, you can use this command with the named parameters nb_data_request
or data_request
.
If you use nb_data_request
, the dictionary items connection_name
, connected_data_name
or data_name
are resolved to an asset_id
, and project_id
or space_id
before the dictionary is transformed into a flight descriptor, provided the specified names are unique in the project or deployment space.
If you use data_request
, the dictionary is transformed to a flight descriptor as is.
Example:
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
flightDescriptor = itcfs.get_flight_descriptor(nb_data_request=MyConnection_data_request, wslib=None)
get_flight_service_url()
The function get_flight_service_url()
returns the configured endpoint of the Flight service.
Example:
itcfs.get_flight_service_url()
get_flight_client()
The function get_flight_client()
returns a ready-to-use flight client pointing to the configured or provided Flight service URL. This method also authenticates with the Flight service by using the default or a provided user token.
Example:
flightClient = itcfs.get_flight_client()
You can use this command with named parameter url
to specify an alternative Flight service endpoint, and with named parameter bearer_token
to use an existing bearer token.
Example:
flightClient = itcfs.get_flight_client(url='<flight_service_url>', bearer_token='<bearer_token>')
TokenClientAuthHandler()
You can use an instance of TokenClientAuthHandler
class to authenticate with a pyarrow.flight.FlightClient
instance. The Flight service expects a bearer token for authentication. To create an instance of the TokenClientAuthHandler,
you can use an existing user token, or a bearer token.
-
Example of a default
TokenClientAuthHandler
without any parameters that uses the authentication token from the environment:authHandler = itcfs.TokenClientAuthHandler()
-
Example of a
TokenClientAuthHandler
that uses a token created by the authentication service:userToken = '<your_user_token>' authHandler = itcfs.TokenClientAuthHandler(token=userToken)
-
Example of a
TokenClientAuthHandler
that uses an existing bearer token:bearerToken = 'Bearer <your_bearer_token>' authHandler = itcfs.TokenClientAuthHandler(bearer_token=bearerToken)
get_flight_info()
The function get_flight_info()
calls the Flight service to receive a pyarrow.flight.FlightInfo
object that contains the information required to read data.
get_flight_info()
expects a positional parameter, an instance of a pyarrow.flight.FlightClient
, and one of the named parameters nb_data_request
or data_request
. The difference between these
parameters and the expected input is described in the sections about get_flight_cmd() and get_flight_descriptor().
Example:
My_flight_request = {
'asset_id': '<asset_id>',
'project_id': '<project_id>',
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
flightClient = itcfs.get_flight_client()
flightInfo = itcfs.get_flight_info(flightClient, data_request=My_flight_request)
read_pandas_and_concat()
The function read_pandas_and_concat()
reads data from all endpoints in the flight info object and concatenates the data partitions into a single pandas.DataFrame
.
read_pandas_and_concat()
expects two positional parameters, namely an instance of a pyarrow.flight.FlightClient
and a pyarrow.flight.FlightInfo object
.
Example:
df = itcfs.read_pandas_and_concat(flightClient, flightInfo)
Hints and tips around timeouts
The Flight service uses different types of timeout options for various reasons. This section describes these timeout options and how to increase the timeout setting if you run into errors.
Request timeout
The flight client terminates requests if the request cannot be completed in a certain timeout period. You can increase the default request timeout of 120 seconds by specifying a FlightCallOptions
object with a timeout parameter.
For example:
flightClient.do_get(endpoint.ticket, options=flight.FlightCallOptions(timeout=float(240)))
Alternatively, if you use the itc_utils
library, you can specify a timeout parameter as follows:
itc_utils.flight_service.read_pandas_and_concat(readClient, flightInfo, timeout=240)
Authentication expiration
For security reasons, authentication with the Flight service expires after 10 minutes. After expiration, you have to re-authenticate with the Flight service by calling the following command:
flightClient.authenticate(authHandler)
Alternatively, if you use the itc_utils
library to obtain a FlightClient, you can re-authenticate by calling:
itc_utils.flight_service.get_flight_client()
Ticket expiration
Tickets for endpoints that you obtain as part of a FlightInfo
object expire after 10 minutes to free resources within the Flight service.
Ticket timeouts can occur if you try to read several large partitions of data sequentially and not all partitions can be processed within the given 10 minutes. You can avoid this kind of timeout by reading the partitions in parallel.
If tickets do expire, you can use the following command to get new tickets:
flightClient.get_flight_info(flightDescriptor)
Alternatively, if you use the itc_utils
library to obtain a FlightInfo
object, you can use the following command:
itc_utils.flight_service.get_flight_info(flightClient, data_request=My_flight_request)
Using Flight with Apache Spark
If you are using Spark in your notebook, you can use the Spark data source com.ibm.connect.spark.flight
to load data into a Spark data frame:
options = {
"flight.location": <flight_service_url>,
"flight.command": <flight_cmd>,
"flight.timeout.default": "30S",
"flight.authToken": <authentication_token>
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
You can define the values for options in your environment by using functions from the itc_utils
library that were described in the previous sections. For example:
import itc_utils.flight_service as itcfs
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
options = {
"flight.location": itcfs.get_flight_service_url(),
"flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
"flight.timeout.default": "30S",
"flight.authToken": itcfs.get_bearer_token()
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
For details about using the itc_utils
library, see Using itc_utils with your own code.
For details of about flight requests, see Flight data requests.
Alternatively, if your connection type is supported, you can use the generated code from the Code snippets pane to generate code for the Flight data source. The following example shows loading from a Db2 Warehouse connection. Note that the read
options are specified one by one. This is equivalent to passing a dictionary, as in .options(**options)
for example.
import itc_utils.flight_service as itcfs
from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()
# Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
}
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)
sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
.option("flight.location", itcfs.get_flight_service_url()) \
.option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
.option("flight.authToken", itcfs.get_bearer_token()) \
.load()
sp_df_1.show(10)
Timeouts when using Spark
In addition to the default timeout for all Flight calls, the Spark Flight data source can accept configurations for the following timeouts:
flight.timeout.auth
: Timeout to complete client authenticationflight.timeout.get
: Timeout to complete a DoGet call to read a Flight streamflight.timeout.put
: Timeout to complete a DoPut call to write a Flight stream
If any of these timeouts are not set, the timeout will fall back to the value set in flight.timeout.default
. If this value is also not set, there will be no timeout.
You can enter the string values for these timeouts in seconds, for example "30S", in minutes for example "5M", or in hours for example "1H".
Learn more
- Arrow Flight
- Flight data requests
- Code samples using Flight in Python
- Code samples using Flight in R
- Reading from a database
- Writing to a database
- ibm-watson-studio-lib for Python
- ibm-watson-studio-lib for R
Parent topic: Using the Flight service in your applications