Flight service in Python notebooks

In IBM Cloud Pak for Data, you can use the Flight Service and the Apache Arrow Flight protocol to read from and write data to data assets in a project or space. These data assets can be files in the storage associated with your current project or space, or data accessed through a database connection.

If you want to write your own code to read from and write data to data assets in a project, you can use the open source pyarrow library to invoke the Flight service.

If you use the code that is generated for you to load data from a file or a connection from the Code snippets pane in a notebook, the code also uses pyarrow to invoke the Flight service, as well as the itc_utils library, which wraps calls to the open source pyarrow library, improving code readablity while reducing code size with minimal additional programming effort.

Basic Flight service interaction

The following code snippets illustrate the basic interactions with the Flight service to read from and write data to a data source. This section also includes a code snippet that shows you how to authenticate with the Flight Server.

Reading data

The steps to read data from a data source include:

  1. Creating a flight descriptor with the metadata to access a data source
  2. Creating an instance of a flight client
  3. Authenticating with the Flight service
  4. Sending the flight descriptor to the Flight service to obtain a flight info object
  5. Reading data from a data source. The code snippet shows reading to a pyarrow.Table, a pandas.DataFrame and how to read data in chunks.

Explanatory sample code snippet to read data. If you are working with large data sets, see Best practices when loading large volumes of data from a file or connection.

# create a flight descriptor specifiying the data source or target
# content and structure of cmd are specific to IBM CP4D's flight service.
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)

# create an instance of a flight client
flightClient = pyarrow.flight.FlightClient(url, **opts)

# authenticate with the flight service (for authHandler, see code snippet at the end of this section).
flightClient.authenticate(authHandler)

# send the flight descriptor the flight service to obtain a FlightInfo object
# which provides information for reading data from one or more endpoints.
flightInfo = flightClient.get_flight_info(flightDescriptor)

# read from all endpoints
for endpoint in flightInfo.endpoints:
    reader = flightClient.do_get(endpoint.ticket)

# from an endpoint (or reader, or stream), you can read in several ways:
# 1) read a pyarrow.Table
table = reader.read_all()
# 2) read a pandas.DataFrame
df = reader.read_pandas()
# 3) read in chunks, i.e. a number of pyarrow.flight.RecordBatch
while True:
    try:
        recordBatch = reader.read_chunk() # read a pyarrow.flight.RecordBatch
    except StopIteration:
        break

Writing data

The steps to write data to a data source include:

  1. Creating a flight descriptor with the metadata to access a data source
  2. Creating an instance of a flight client
  3. Authenticating with the Flight service
  4. Obtaining a flight write stream
  5. Writing data to a data target

Sample code snippet to write data:

import pyarrow as pa

# create a flight descriptor specifiying the data source or target
# content and structure of cmd are specific to IBM CP4D's flight service.
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)

# create an instance of a flight client
flightClient = pyarrow.flight.FlightClient(url, **opts)

# authenticate with the flight service (for authHandler, see code snippet at the end of this section).
flightClient.authenticate(authHandler)

# obtain a flight write stream
schema = pa.Schema.from_pandas(df, preserve_index=False)
writer, reader = flightClient.do_put(flightDescriptor, schema)

# write data to a data target
writer.write_table(pa.Table.from_pandas(df, schema))
writer.close()

Flight descriptor

A central piece of interaction with the Flight service is the flight descriptor which specifies the access to the data source. It includes a data source specification in the form of connection properties, such as host, port, and so on, and interaction properties, such as a table name, or SQL statement. Instead of connection properties, you can also specify the IDs of the asset and of the project or deployment space.

Technically, the Flight service expects a flight descriptor in the form of a JSON string. In Python notebooks, you can use Python dictionaries to construct a flight descriptor:

flight_request = {
    "asset_id": "<asset_id>",
    "project_id": "<project_id>",
    "interaction_properties": {
        "schema_name": "<schema>",
        "table_name": "<table>",
        "row_limit": 5000
    }
}

# create a flight descriptor
cmd = json.dumps(flight_request)
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)

The asset_id can be the ID of a connected data asset or a connection asset.

Flight descriptors for reading or writing data only differ slightly with regards to the interaction_properties. Read requests typically have interaction properties like 'sql_statement', 'file_name', or 'table_name', while write requests have additional interaction properties, such as 'existing_table_action' or 'file_format'.

For details about the data request syntax, see Flight data requests.

Authenticating with the Flight Server

You need to authenticate with the Flight service with a valid bearer token. For that purpose, you can use the following code snippet to write a custom authentication handler class and create an instance of this class.

import pyarrow.flight as flight

class TokenClientAuthHandler(flight.ClientAuthHandler):
    """An example implementation of authentication with a user token."""

    def __init__(self, token):
        super().__init__()
        strToken = str(token)
        self.token = strToken.encode('utf-8')

    def authenticate(self, outgoing, incoming):
        outgoing.write(self.token)
        self.token = incoming.read()

    def get_token(self):
        return self.token

# create an instance of the authentication handler by using IBM Watson Studio Lib

from ibm_watson_studio_lib import access_project_or_space
wslib = access_project_or_space()
token = 'Bearer {}'.format(wslib.auth.get_current_token())

authHandler = TokenClientAuthHandler(token)

Using generated code for the Flight service

When you use the code that is generated for you to load data from a file or a connection from the Code snippets pane in a notebook, the generated code uses pyarrow to invoke the Flight service. However, the function calls to pyarrow are not visible because they are wrapped in higher level functions provided by another library called itc_utils.

This itc_utils library is pre-installed in all notebook runtime environments provided by IBM for the purpose of reducing the code size and making the code more readable. To achieve this goal, the itc_utils library leverages information from the runtime environment and from ibm_watson_studio_lib library.

Another significant advantage of the generated code is that the data request has special properties, namely 'connection_name', 'connected_data_name', or 'data_name', depending on the kind of asset for which you generate the code. itc_utils converts these properties into an asset_id, plus a project_id or space_id, before creating a flight descriptor.

Example of code that is generated for you to load data:

import itc_utils.flight_service as itcfs

readClient = itcfs.get_flight_client()

MyConnection_data_request = {
    'connection_name': """MyConnection""",
    'interaction_properties': {
        'row_limit': 5000,
        'schema_name': '<schema>',
        'table_name': '<table>'
    }
}

flightInfo = itcfs.get_flight_info(readClient, nb_data_request=MyConnection_data_request)

data_df_1 = itcfs.read_pandas_and_concat(readClient, flightInfo, timeout=240)
data_df_1.head(10)

Various functions of the itc_utils library are described in the following section.

Using itc_utils with your own code

You can use the functions provided by itc_utils to extend code that uses the pyarrow library. Because itc_utils is based on pyarrow, you can pick specific functions of itc_utils and combine them with your pyarrow code.

Note:

The itc_utils library provides helper functions that you can use to make your programs easier to read. This library can be removed by IBM at any time, if deemed necessary, and functions can be changed without prior notice.

The following functions can simplify code development.

  • Functions for Flight descriptor creation:

    • get_data_request(): transforms a Python dictionary with named assets into the request required by the Flight service
    • get_flight_cmd(): serializes a Python dictionary into a JSON string
    • get_flight_descriptor(): creates a ready to use pyarrow.flight.FlightDescriptor from a Python dictionary
  • Functions for client instantiation and authentication:

  • Functions for reading data

get_data_request()

The function get_data_request() transforms a Python dictionary with items connection_name, connected_data_name or data_name into a Python dictionary representing a Flight service request with items asset_id, and project_id or space_id.

The result can be used as input for other itc_utils library functions, for example get_flight_descriptor().

Examples:

  • With item connection_name:

    connection_data_request = {
        'connection_name': """MyConnection""",
        'interaction_properties': {
            'row_limit': 5000,
            'schema_name': '<schema>',
            'table_name': '<table>'
        }
    }
    flightRequest = itcfs.get_data_request(nb_data_request=connection_data_request)
    
  • With item connected_data_name:

    connected_asset_data_request = {
        'connected_data_name': """NameOfConnectedAsset""",
        'interaction_properties': {
            #'row_limit': 5000
        }
    }
    flightRequest = itcfs.get_data_request(nb_data_request=connected_asset_data_request)
    
  • With item data_name:

    CSV_data_request = {
        'data_name': """little.csv""",
        'interaction_properties': {
            #'row_limit': 500,
            'infer_schema': 'true',
            'infer_as_varchar': 'false'
            }
    }
    flightRequest = itcfs.get_data_request(nb_data_request=CSV_data_request)
    

get_flight_cmd()

The function get_flight_cmd() transforms a Python dictionary, representing a Flight service request into a JSON string. The function is helpful in Spark notebooks because the Spark Flight Connector expects a cmd. You can also use this function in your own pyarrow code to get a valid cmd for the Flight service.

The result of the get_flight_cmd() function can serve as input to create of a pyarrow.flight.FlightDescriptor.

You can use this command with named parameters nb_data_request or data_request.

If you use nb_data_request, the dictionary items connection_name, connected_data_name or data_name are resolved to an asset_id, and project_id or space_id before the dictionary is transformed into a JSON string.

If you use data_request, the dictionary is transformed to a JSON string as is.

Examples:

  • Named parameter nb_data_request with dictionary item connection_name:

    MyConnection_data_request = {
        'connection_name': """MyConnection""",
        'interaction_properties': {
            'row_limit': 5000,
            'schema_name': '<schema>',
            'table_name': '<table>'
        }
    }
    cmd = itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request)
    
  • Named parameter data_ request:

    My_flight_request = {
        'asset_id': '<asset_id>',
        'project_id': '<project_id>',
        'interaction_properties': {
            'row_limit': 5000,
            'schema_name': '<schema>',
            'table_name': '<table>'
        }
    }
    cmd = itcfs.get_flight_cmd(data_request=My_flight_request)
    

get_flight_descriptor()

The function get_flight_descriptor() transforms a Python dictionary, representing a Flight service request into a pyarrow.flight.FlightDescriptor.

As described in get_flight_cmd, you can use this command with the named parameters nb_data_request or data_request.

If you use nb_data_request, the dictionary items connection_name, connected_data_name or data_name are resolved to an asset_id, and project_id or space_id before the dictionary is transformed into a flight descriptor, provided the specified names are unique in the project or deployment space.

If you use data_request, the dictionary is transformed to a flight descriptor as is.

Example:

MyConnection_data_request = {
    'connection_name': """MyConnection""",
    'interaction_properties': {
        'row_limit': 5000,


       'schema_name': '<schema>',
        'table_name': '<table>'
    }
}

flightDescriptor = itcfs.get_flight_descriptor(nb_data_request=MyConnection_data_request, wslib=None)

get_flight_service_url()

The function get_flight_service_url() returns the configured endpoint of the Flight service.

Example:

itcfs.get_flight_service_url()

get_flight_client()

The function get_flight_client() returns a ready-to-use flight client pointing to the configured or provided Flight service URL. This method also authenticates with the Flight service by using the default or a provided user token.

Example:

flightClient = itcfs.get_flight_client()

You can use this command with named parameter url to specify an alternative Flight service endpoint, and with named parameter bearer_token to use an existing bearer token.

Example:

flightClient = itcfs.get_flight_client(url='<flight_service_url>', bearer_token='<bearer_token>')

TokenClientAuthHandler()

You can use an instance of TokenClientAuthHandler class to authenticate with a pyarrow.flight.FlightClient instance. The Flight service expects a bearer token for authentication. To create an instance of the TokenClientAuthHandler, you can use an existing user token, or a bearer token.

  • Example of a default TokenClientAuthHandler without any parameters that uses the authentication token from the environment:

    authHandler = itcfs.TokenClientAuthHandler()
    
  • Example of a TokenClientAuthHandler that uses a token created by the authentication service:

    userToken = '<your_user_token>'
    authHandler = itcfs.TokenClientAuthHandler(token=userToken)
    
  • Example of a TokenClientAuthHandler that uses an existing bearer token:

    bearerToken = 'Bearer <your_bearer_token>'
    authHandler = itcfs.TokenClientAuthHandler(bearer_token=bearerToken)
    

get_flight_info()

The function get_flight_info()calls the Flight service to receive a pyarrow.flight.FlightInfo object that contains the information required to read data.

get_flight_info() expects a positional parameter, an instance of a pyarrow.flight.FlightClient, and one of the named parameters nb_data_request or data_request. The difference between these parameters and the expected input is described in the sections about get_flight_cmd() and get_flight_descriptor().

Example:

My_flight_request = {
    'asset_id': '<asset_id>',
    'project_id': '<project_id>',
    'interaction_properties': {
        'row_limit': 5000,
        'schema_name': '<schema>',
        'table_name': '<table>'
    }
}
flightClient = itcfs.get_flight_client()
flightInfo = itcfs.get_flight_info(flightClient, data_request=My_flight_request)

read_pandas_and_concat()

The function read_pandas_and_concat()reads data from all endpoints in the flight info object and concatenates the data partitions into a single pandas.DataFrame.

read_pandas_and_concat() expects two positional parameters, namely an instance of a pyarrow.flight.FlightClient and a pyarrow.flight.FlightInfo object.

Example:

df = itcfs.read_pandas_and_concat(flightClient, flightInfo)

Hints and tips around timeouts

The Flight service uses different types of timeout options for various reasons. This section describes these timeout options and how to increase the timeout setting if you run into errors.

Request timeout

The flight client terminates requests if the request cannot be completed in a certain timeout period. You can increase the default request timeout of 120 seconds by specifying a FlightCallOptions object with a timeout parameter. For example:

flightClient.do_get(endpoint.ticket, options=flight.FlightCallOptions(timeout=float(240)))

Alternatively, if you use the itc_utils library, you can specify a timeout parameter as follows:

itc_utils.flight_service.read_pandas_and_concat(readClient, flightInfo, timeout=240)

Authentication expiration

For security reasons, authentication with the Flight service expires after 10 minutes. After expiration, you have to re-authenticate with the Flight service by calling the following command:

flightClient.authenticate(authHandler)

Alternatively, if you use the itc_utils library to obtain a FlightClient, you can re-authenticate by calling:

itc_utils.flight_service.get_flight_client()

Ticket expiration

Tickets for endpoints that you obtain as part of a FlightInfo object expire after 10 minutes to free resources within the Flight service.

Ticket timeouts can occur if you try to read several large partitions of data sequentially and not all partitions can be processed within the given 10 minutes. You can avoid this kind of timeout by reading the partitions in parallel.

If tickets do expire, you can use the following command to get new tickets:

flightClient.get_flight_info(flightDescriptor)

Alternatively, if you use the itc_utils library to obtain a FlightInfoobject, you can use the following command:

itc_utils.flight_service.get_flight_info(flightClient, data_request=My_flight_request)

Using Flight with Apache Spark

If you are using Spark in your notebook, you can use the Spark data source com.ibm.connect.spark.flight to load data into a Spark data frame:

options = {
    "flight.location": <flight_service_url>,
    "flight.command": <flight_cmd>,
    "flight.timeout.default": "30S",
    "flight.authToken": <authentication_token>
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

You can define the values for options in your environment by using functions from the itc_utils library that were described in the previous sections. For example:


import itc_utils.flight_service as itcfs

MyConnection_data_request = {
      'connection_name': """MyConnection""",
      'interaction_properties': {
          'row_limit': 5000,
          'schema_name': '<schema>',
          'table_name': '<table>'
      }
 }

options = {
    "flight.location": itcfs.get_flight_service_url(),
    "flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
    "flight.timeout.default": "30S",
    "flight.authToken": itcfs.get_bearer_token()
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

For details about using the itc_utils library, see Using itc_utils with your own code.

For details of about flight requests, see Flight data requests.

Alternatively, if your connection type is supported, you can use the generated code from the Code snippets pane to generate code for the Flight data source. The following example shows loading from a Db2 Warehouse connection. Note that the read options are specified one by one. This is equivalent to passing a dictionary, as in .options(**options) for example.

import itc_utils.flight_service as itcfs

from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()

#  Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
    'connection_name': """MyConnection""",
    'interaction_properties': {
        'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
    }
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)

sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
    .option("flight.location", itcfs.get_flight_service_url()) \
    .option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
    .option("flight.authToken", itcfs.get_bearer_token()) \
    .load()
sp_df_1.show(10)

Timeouts when using Spark

In addition to the default timeout for all Flight calls, the Spark Flight data source can accept configurations for the following timeouts:

  • flight.timeout.auth: Timeout to complete client authentication
  • flight.timeout.get: Timeout to complete a DoGet call to read a Flight stream
  • flight.timeout.put: Timeout to complete a DoPut call to write a Flight stream

If any of these timeouts are not set, the timeout will fall back to the value set in flight.timeout.default. If this value is also not set, there will be no timeout.

You can enter the string values for these timeouts in seconds, for example "30S", in minutes for example "5M", or in hours for example "1H".

Learn more

Parent topic: Using the Flight service in your applications