Examples of reading and writing data with the Flight service and itc_utils and pandas
You can use the Flight Service and the Apache Arrow Flight protocol to read and write data by using itc_utils
and pandas
in a project or space.
You can use the following examples to learn how to specify the data source and interactive properties for data read
or write
requests with itc_utils
and pandas
. The properties vary depending
on the data source.
Example of read from Db2
import itc_utils.flight_service as fs
# NOTE: Named schemas, tables, and columns must match the case of your database.
# You will see errors if you do not match case.
# reads data from named schema and table, row_limit is optional
db2_read = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
'row_limit': 1000,
}
}
# retrieves data from an SQL statement
# SQL statements are always case-insensitive
db2_query = {
'connection_name': 'your_connection',
'interaction_properties': {
'select_statement': 'select * from schema.table where a = 6 and b is null',
}
}
readClient = fs.get_flight_client()
# replace db2_xxxxx with one of the request options above
flightInfo = fs.get_flight_info(readClient, nb_data_request=db2_xxxxx)
df = fs.read_pandas_and_concat(readClient, flightInfo, timeout=240)
df.info()
Example of writing to Db2
For write
functions, the defaults are 'table_action': 'append', 'write_mode': 'insert'
. You can choose whether to add them explicitly.
# inserts data to existing table or creates a new table if none exists
db2_insert = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
}
}
# replaces all data in existing table
db2_replace = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
'table_action': 'truncate',
'write_mode': 'insert',
}
}
# inserts new data or updates existing data depending on matching key(s)
db2_upsert = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
'table_action': 'append',
'write_mode': 'merge',
'key_column_names': 'COLUMN_A,COLUMN_B',
}
}
# write a pandas dataframe to your Db2
# be sure your column case matches the database
df.columns = df.columns.str.upper()
# replace db2_xxxxx with one of the three write options above
fs.write_dataframe(df, data_request=fs.get_data_request(nb_data_request=db2_xxxxx))
Example of using do_action
Calling do_action
executes an action immediately. The returned generator receives the results of the action and is populated after the action is complete. The client must consume the entire generator to complete the action. Results
might vary depending on the action. The action is successful if no error is returned. You can call list_actions()
to get a list of available actions.
Calling flight_client.list_actions()
returns the following available action types:
discovery
-
Performs discovery for the given connector request.
Returns the metadata that was discovered for the requested data source.
setup_phase
-
Runs the setup phase for the given connector request, which is used to prepare partitions for writing data or to run a static SQL statement.
Returns a setup phase response message that contains a resource ID, partition information, and schema for the requested data target.
wrapup_phase
-
Runs the wrap-up phase for the given connector request, which is used after writing all partitions to the connector.
Returns an empty result.
test
-
Tests the resource, properties, and connection of the given connector request.
Returns a connector resource key.
validate
-
Validates the resource and properties of the given connector request without making a connection.
Returns a connector resource key.
health_check
-
Performs a health check for Flight.
Returns a Flight service status and library build information.
The return value of do_action
is a generator that must be fully consumed to complete the action. You can convert this generator to a list, which will consume the generator. For example:
result = list(client.do_action(action))
The following example uses the setup_phase
action to run a static SQL statement.
import itc_utils.flight_service as itcfs
from ibm_watson_studio_lib import access_project_or_space
import pyarrow.flight as flight
flightClient = itcfs.get_flight_client()
nb_data_request = {
'connection_name': """PostgreSQL""",
'interaction_properties': {
'static_statement': 'CREATE TABLE IF NOT EXISTS public.uno2 (id serial, x varchar (32))',
'write_mode': 'static_statement'
},
}
flight_request = itcfs.get_data_request(nb_data_request=nb_data_request)
flight_request['context']= 'target'
flight_cmd = itcfs.get_flight_cmd(data_request=flight_request)
print(flight_cmd)
action = flight.Action('setup_phase',flight_cmd.encode('utf-8'))
gen = flightClient.do_action(action)
result = list(gen) # fully consume iterator
Example of a merge
import itc_utils.flight_service as itcfs
table = itcfs.pa.Table.from_pandas(df_out)
write_data_request = {
'connection_name': """Generic JDBC DB2-Mak""",
'interaction_properties': {
'schema_name': 'MPS38401',
'table_name': 'BOFA_OUT',
'table_action': 'append',
'write_mode': 'merge',
'key_column_names': 'ID',
}
}
client = itcfs.get_flight_client()
flight_request = itcfs.get_data_request(nb_data_request=write_data_request)
flight_descriptor=itcfs.flight.FlightDescriptor.for_command(str(flight_request))
writer, _ = client.do_put(flight_descriptor, table.schema)
writer.write_table(table)
writer.close()
Example of a lookup
The following example shows how to use the Flight do_exchange
method to perform a lookup. If both your source table and your lookup table are from two different data sources, for example, if the lookup keys are from a CSV file or
Excel file kept in Cloud Object Storage but your lookup table is in Db2, then you can use a lookup.
The source_command
input is the same as the command sent to do_get
with the same supported properties. The do_exchange
method does not return a flight-info object. You can only send a single stream and
receive a single stream. It does not support a partitioned result. For more information, see the Apache Flight documentation.
Data sources that support parameterized queries can support lookup
. The following data sources support lookup
:
- Amazon RDS for MySQL
- Amazon RDS for Oracle
- Amazon RDS for PostgreSQL
- Amazon Redshift
- Apache Cassandra
- Apache Derby
- Apache Hive
- Cloudera Impala
- Data Virtualization Manager for z/OS
- Databases for DataStax
- Databases for MongoDB
- Databases for MySQL
- Databases for PostgreSQL
- Db2
- Db2 on Cloud
- Db2 for i
- Db2 for z/OS
- Db2 Big SQL
- Db2 Warehouse
- Informix
- MariaDB
- Microsoft Azure SQL Database
- Microsoft SQL Server
- MongoDB
- MySQL
- Netezza (PureData System for Analytics)
- Oracle
- PostgreSQL
- Salesforce.com
- SAP ASE (formerly Sybase)
- SAP HANA
- SingleStoreDB
- Snowflake
- Teradata
from pyarrow import flight
from pathlib import Path
from pprint import pprint
import pandas as pd
import pyarrow as pa
import argparse
import sys
import getopt
import os
import time
import threading
class TokenClientAuthHandler(flight.ClientAuthHandler):
def __init__(self, token):
super().__init__()
strToken = str(token)
self.token = strToken.encode('utf-8')
def authenticate(self, outgoing, incoming):
outgoing.write(self.token)
self.token = incoming.read()
def get_token(self):
return self.token
# Define a function that sends a given authorization token to the FlightClient for authentication.
def authenticateClient(token):
readClient.authenticate(TokenClientAuthHandler(token),
options=flight.FlightCallOptions(timeout=5.0))
# Define a function that retrieves the order numbers for any orders placed by fax or telephone, i.e. WHERE ORDER_METHOD_CODE is 1 or 2.
def getLookupKeys():
txt = '{ \
"asset_id": "048fa8bf-80ef-417a-94ef-abcfb248fd69", \
"interaction_properties": {"select_statement":"SELECT ORDER_NUMBER FROM GOSALES_1021.ORDER_HEADER WHERE ORDER_METHOD_CODE IN (1,2)"}, \
"project_id": "f1c26626-b68a-47ee-a9ba-4d9f5bc28dc3" \
}'
print(txt)
keyInfo = readClient.get_flight_info(flight.FlightDescriptor.for_command(txt))
print(keyInfo.schema)
keyTables = []
for endpoint in keyInfo.endpoints:
print(endpoint.ticket)
reader = readClient.do_get(endpoint.ticket)
keyTable = reader.read_all()
print(keyTable.num_rows)
keyTables.append(keyTable)
return keyInfo, keyTables
# Define a function that starts an exchange to lookup order details and returns a FlightStreamWriter and a FlightStreamReader.
def startLookup():
txt = '{ \
"source_command": { \
"asset_id": "048fa8bf-80ef-417a-94ef-abcfb248fd69", \
"interaction_properties": {"schema_name":"GOSALES_1021","table_name":"ORDER_DETAILS"}, \
"project_id": "f1c26626-b68a-47ee-a9ba-4d9f5bc28dc3" \
} \
}'
print(txt)
return readClient.do_exchange(flight.FlightDescriptor.for_command(txt))
# Define a function that sends tables of lookup keys to the given exchange writer.
def sendLookupKeys(keyInfo, writer, keyTables):
print("Sending lookup keys...")
with writer:
writer.begin(keyInfo.schema)
for keyTable in keyTables:
writer.write_table(keyTable)
writer.done_writing()
print("Lookup keys sent")
# Define a function that reads the lookup results and prints the total number of result rows.
def getLookupResults():
resultsTable = lookupReader.read_all()
print(resultsTable.num_rows)
# Start a timer
time_start = time.perf_counter()
# Connect to the Flight server, where <namespace> is your Cloud Pak for Data instance namespace
host = 'internal-nginx-svc.<namespace>.svc'
port = '12443'
readClient = flight.FlightClient(
"grpc+tls://" + host + ":" + port,
override_hostname=host,
disable_server_verification=True)
token = 'Bearer {}'.format(wslib.auth.get_current_token())
authenticateClient(token)
# Get the lookup keys
keyInfo, keyTables = getLookupKeys()
# Start a lookup exchange
keyWriter, lookupReader = startLookup()
print("Received writer and reader")
# Start a thread to write the lookup keys
writeThread = threading.Thread(target=sendLookupKeys, args=(keyInfo, keyWriter, keyTables))
writeThread.start()
# Start a thread to read the lookup results
readThread = threading.Thread(target=getLookupResults, args=())
readThread.start()
# Wait for the threads to finish
writeThread.join()
readThread.join()
# Stop the timer
time_stop = time.perf_counter()
print(f"Total time {time_stop - time_start:0.4f} seconds")
Learn more
Parent topic: Using the Flight service in your applications