Connecting to Flight service manually in Python notebooks
You can use the open source pyarrow
library to invoke Flight service .If you want to write your own code to read from and write data to data assets in a project, you can use the the itc_utils
library.
If you use the code that is generated for you to load data from a file or a connection from the Code snippets pane in a notebook, the code also uses pyarrow
to invoke Flight service , as well as the itc_utils
library,
which wraps calls to the open source pyarrow
library, improving code readablity while reducing code size with minimal additional programming effort.
Basic Flight service interaction
The following code snippets illustrate the basic interactions with Flight service to read from and write data to a data source. This section also includes a code snippet that shows you how to authenticate with the Flight Server.
Reading data
The steps to read data from a data source include:
- Creating a flight descriptor with the metadata to access a data source
- Creating an instance of a flight client
- Authenticating with Flight service
- Sending the flight descriptor to Flight service to obtain a flight info object
- Reading data from a data source. The code snippet shows reading to a
pyarrow.Table
, apandas.DataFrame
and how to read data in chunks.
Explanatory sample code snippet to read data. If you are working with large data sets, see Best practices when loading large volumes of data from a file or connection.
# create a flight descriptor specifiying the data source or target
# content and structure of cmd are specific to IBM CP4D's flight service.
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)
# create an instance of a flight client
flightClient = pyarrow.flight.FlightClient(url, **opts)
# authenticate with the flight service (for authHandler, see code snippet at the end of this section).
flightClient.authenticate(authHandler)
# send the flight descriptor the flight service to obtain a FlightInfo object
# which provides information for reading data from one or more endpoints.
flightInfo = flightClient.get_flight_info(flightDescriptor)
# read from all endpoints
for endpoint in flightInfo.endpoints:
reader = flightClient.do_get(endpoint.ticket)
# from an endpoint (or reader, or stream), you can read in several ways:
# 1) read a pyarrow.Table
table = reader.read_all()
# 2) read a pandas.DataFrame
df = reader.read_pandas()
# 3) read in chunks, i.e. a number of pyarrow.flight.RecordBatch
while True:
try:
recordBatch = reader.read_chunk() # read a pyarrow.flight.RecordBatch
except StopIteration:
break
Writing data
The steps to write data to a data source include:
- Creating a flight descriptor with the metadata to access a data source
- Creating an instance of a flight client
- Authenticating with Flight service
- Obtaining a flight write stream
- Writing data to a data target
Sample code snippet to write data:
import pyarrow as pa
# create a flight descriptor specifiying the data source or target
# content and structure of cmd are specific to IBM CP4D's flight service.
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)
# create an instance of a flight client
flightClient = pyarrow.flight.FlightClient(url, **opts)
# authenticate with Flight service (for authHandler, see code snippet at the end of this section).
flightClient.authenticate(authHandler)
# obtain a flight write stream
schema = pa.Schema.from_pandas(df, preserve_index=False)
writer, reader = flightClient.do_put(flightDescriptor, schema)
# write data to a data target
writer.write_table(pa.Table.from_pandas(df, schema))
writer.close()
Flight descriptor
A central piece of interaction with Flight service is the flight descriptor which specifies the access to the data source. It includes a data source specification in the form of connection properties, such as host, port, and so on, and interaction properties, such as a table name, or SQL statement. Instead of connection properties, you can also specify the IDs of the asset and of the project or deployment space.
Technically, Flight service expects a flight descriptor in the form of a JSON string. In Python notebooks, you can use Python dictionaries to construct a flight descriptor:
flight_request = {
"asset_id": "<asset_id>",
"project_id": "<project_id>",
"interaction_properties": {
"schema_name": "<schema>",
"table_name": "<table>",
"row_limit": 5000
}
}
# create a flight descriptor
cmd = json.dumps(flight_request)
flightDescriptor = pyarrow.flight.FlightDescriptor.for_command(cmd)
The asset_id
can be the ID of a connected data asset or a connection asset.
Flight descriptors for reading or writing data only differ slightly with regards to the interaction_properties
. Read requests typically have interaction properties like 'sql_statement', 'file_name', or 'table_name', while write
requests have additional interaction properties, such as 'existing_table_action' or 'file_format'.
For details about the data request syntax, see Flight data requests.
Authenticating with the Flight Server
You need to authenticate with Flight service with a valid bearer token. For that purpose, you can use the following code snippet to write a custom authentication handler class and create an instance of this class.
import pyarrow.flight as flight
class TokenClientAuthHandler(flight.ClientAuthHandler):
"""An example implementation of authentication with a user token."""
def __init__(self, token):
super().__init__()
strToken = str(token)
self.token = strToken.encode('utf-8')
def authenticate(self, outgoing, incoming):
outgoing.write(self.token)
self.token = incoming.read()
def get_token(self):
return self.token
# create an instance of the authentication handler by using IBM Watson Studio Lib
from ibm_watson_studio_lib import access_project_or_space
wslib = access_project_or_space()
token = 'Bearer {}'.format(wslib.auth.get_current_token())
authHandler = TokenClientAuthHandler(token)
Learn more
Parent topic: Flight service in Python notebooks