Examples of reading and writing data with the Flight service and itc_utils and pandas

You can use the Flight Service and the Apache Arrow Flight protocol to read and write data by using itc_utils and pandas in a project or space.

You can use the following examples to learn how to specify the data source and interactive properties for data read or write requests with itc_utils and pandas. The properties vary depending on the data source.

Example of read from Db2

import itc_utils.flight_service as fs

# NOTE: Named schemas, tables, and columns must match the case of your database.
#       You will see errors if you do not match case.

# reads data from named schema and table, row_limit is optional
db2_read = {
    'connection_name': 'your_connection',
    'interaction_properties': {
        'schema_name': 'YOUR_SCHEMA',
        'table_name': 'YOUR_TABLE',
        'row_limit': 1000,
    }
}

# retrieves data from an SQL statement
# SQL statements are always case-insensitive
db2_query = {
    'connection_name': 'your_connection',
    'interaction_properties': {
        'select_statement': 'select * from schema.table where a = 6 and b is null',
    }
}

readClient = fs.get_flight_client()

# replace db2_xxxxx with one of the request options above
flightInfo = fs.get_flight_info(readClient, nb_data_request=db2_xxxxx)

df = fs.read_pandas_and_concat(readClient, flightInfo, timeout=240)
df.info()

Example of writing to Db2

For write functions, the defaults are 'table_action': 'append', 'write_mode': 'insert'. You can choose whether to add them explicitly.

# inserts data to existing table or creates a new table if none exists
db2_insert = {
    'connection_name': 'your_connection',
    'interaction_properties': {
        'schema_name': 'YOUR_SCHEMA',
        'table_name': 'YOUR_TABLE',
    }
}

# replaces all data in existing table
db2_replace = {
    'connection_name': 'your_connection',
    'interaction_properties': {
        'schema_name': 'YOUR_SCHEMA',
        'table_name': 'YOUR_TABLE',
        'table_action': 'truncate',
        'write_mode': 'insert',
    }
}

# inserts new data or updates existing data depending on matching key(s)
db2_upsert = {
    'connection_name': 'your_connection',
    'interaction_properties': {
        'schema_name': 'YOUR_SCHEMA',
        'table_name': 'YOUR_TABLE',
        'table_action': 'append',
        'write_mode': 'merge',
        'key_column_names': 'COLUMN_A,COLUMN_B',
    }
}

# write a pandas dataframe to your Db2
# be sure your column case matches the database
df.columns = df.columns.str.upper()

# replace db2_xxxxx with one of the three write options above
fs.write_dataframe(df, data_request=fs.get_data_request(nb_data_request=db2_xxxxx))

Example of using do_action

Calling do_action executes an action immediately. The returned generator receives the results of the action and is populated after the action is complete. The client must consume the entire generator to complete the action. Results might vary depending on the action. The action is successful if no error is returned. You can call list_actions() to get a list of available actions.

Calling flight_client.list_actions() returns the following available action types:

discovery

Performs discovery for the given connector request.

Returns the metadata that was discovered for the requested data source.

setup_phase

Runs the setup phase for the given connector request, which is used to prepare partitions for writing data or to run a static SQL statement.

Returns a setup phase response message that contains a resource ID, partition information, and schema for the requested data target.

wrapup_phase

Runs the wrap-up phase for the given connector request, which is used after writing all partitions to the connector.

Returns an empty result.

test

Tests the resource, properties, and connection of the given connector request.

Returns a connector resource key.

validate

Validates the resource and properties of the given connector request without making a connection.

Returns a connector resource key.

health_check

Performs a health check for Flight.

Returns a Flight service status and library build information.

The return value of do_action is a generator that must be fully consumed to complete the action. You can convert this generator to a list, which will consume the generator. For example:

result = list(client.do_action(action))

The following example uses the setup_phase action to run a static SQL statement.

import itc_utils.flight_service as itcfs
from ibm_watson_studio_lib import access_project_or_space
import pyarrow.flight as flight

flightClient = itcfs.get_flight_client()

nb_data_request = {
    'connection_name': """PostgreSQL""",
    'interaction_properties': {
        'static_statement': 'CREATE TABLE IF NOT EXISTS public.uno2 (id serial, x varchar (32))',
        'write_mode': 'static_statement'
    },
}
flight_request = itcfs.get_data_request(nb_data_request=nb_data_request)
flight_request['context']= 'target'

flight_cmd = itcfs.get_flight_cmd(data_request=flight_request)
print(flight_cmd)

action = flight.Action('setup_phase',flight_cmd.encode('utf-8'))
gen = flightClient.do_action(action)
result = list(gen) # fully consume iterator

Example of a merge

import itc_utils.flight_service as itcfs

table = itcfs.pa.Table.from_pandas(df_out)

write_data_request = {
    'connection_name': """Generic JDBC DB2-Mak""",
    'interaction_properties': {
        'schema_name': 'MPS38401',
        'table_name': 'BOFA_OUT',
        'table_action': 'append',
        'write_mode': 'merge',
        'key_column_names': 'ID',
    }
}

client = itcfs.get_flight_client()
flight_request = itcfs.get_data_request(nb_data_request=write_data_request)
flight_descriptor=itcfs.flight.FlightDescriptor.for_command(str(flight_request))
writer, _ = client.do_put(flight_descriptor, table.schema)
writer.write_table(table)
writer.close()

Example of a lookup

The following example shows how to use the Flight do_exchange method to perform a lookup. If both your source table and your lookup table are from two different data sources, for example, if the lookup keys are from a CSV file or Excel file kept in Cloud Object Storage but your lookup table is in Db2, then you can use a lookup.

The source_command input is the same as the command sent to do_get with the same supported properties. The do_exchange method does not return a flight-info object. You can only send a single stream and receive a single stream. It does not support a partitioned result. For more information, see the Apache Flight documentation.

Note: For `do_exchange` to perform the lookup successfully, the writer must not be closed until the reader has received all rows.

Data sources that support parameterized queries can support lookup. The following data sources support lookup:

  • Amazon RDS for MySQL
  • Amazon RDS for Oracle
  • Amazon RDS for PostgreSQL
  • Amazon Redshift
  • Apache Cassandra
  • Apache Derby
  • Apache Hive
  • Cloudera Impala
  • Data Virtualization Manager for z/OS
  • Databases for DataStax
  • Databases for MongoDB
  • Databases for MySQL
  • Databases for PostgreSQL
  • Db2
  • Db2 on Cloud
  • Db2 for i
  • Db2 for z/OS
  • Db2 Big SQL
  • Db2 Warehouse
  • Informix
  • MariaDB
  • Microsoft Azure SQL Database
  • Microsoft SQL Server
  • MongoDB
  • MySQL
  • Netezza (PureData System for Analytics)
  • Oracle
  • PostgreSQL
  • Salesforce.com
  • SAP ASE (formerly Sybase)
  • SAP HANA
  • SingleStoreDB
  • Snowflake
  • Teradata
from pyarrow import flight
from pathlib import Path
from pprint import pprint
import pandas as pd
import pyarrow as pa
import argparse
import sys
import getopt
import os
import time
import threading
class TokenClientAuthHandler(flight.ClientAuthHandler):
    def __init__(self, token):
        super().__init__()
        strToken = str(token)
        self.token = strToken.encode('utf-8')
    def authenticate(self, outgoing, incoming):
        outgoing.write(self.token)
        self.token = incoming.read()
    def get_token(self):
        return self.token

# Define a function that sends a given authorization token to the FlightClient for authentication.
def authenticateClient(token):
    readClient.authenticate(TokenClientAuthHandler(token),
                            options=flight.FlightCallOptions(timeout=5.0))

# Define a function that retrieves the order numbers for any orders placed by fax or telephone, i.e. WHERE ORDER_METHOD_CODE is 1 or 2.
def getLookupKeys():
    txt = '{ \
        "asset_id": "048fa8bf-80ef-417a-94ef-abcfb248fd69", \
        "interaction_properties": {"select_statement":"SELECT ORDER_NUMBER FROM GOSALES_1021.ORDER_HEADER WHERE ORDER_METHOD_CODE IN (1,2)"}, \
        "project_id": "f1c26626-b68a-47ee-a9ba-4d9f5bc28dc3" \
    }'
    print(txt)
    keyInfo = readClient.get_flight_info(flight.FlightDescriptor.for_command(txt))
    print(keyInfo.schema)
    keyTables = []
    for endpoint in keyInfo.endpoints:
        print(endpoint.ticket)
        reader = readClient.do_get(endpoint.ticket)
        keyTable = reader.read_all()
        print(keyTable.num_rows)
        keyTables.append(keyTable)
    return keyInfo, keyTables

# Define a function that starts an exchange to lookup order details and returns a FlightStreamWriter and a FlightStreamReader.
def startLookup():
    txt = '{ \
        "source_command": { \
            "asset_id": "048fa8bf-80ef-417a-94ef-abcfb248fd69", \
            "interaction_properties": {"schema_name":"GOSALES_1021","table_name":"ORDER_DETAILS"}, \
            "project_id": "f1c26626-b68a-47ee-a9ba-4d9f5bc28dc3" \
         } \
    }'
    print(txt)
    return readClient.do_exchange(flight.FlightDescriptor.for_command(txt))

# Define a function that sends tables of lookup keys to the given exchange writer.
def sendLookupKeys(keyInfo, writer, keyTables):
    print("Sending lookup keys...")
    with writer:
        writer.begin(keyInfo.schema)
        for keyTable in keyTables:
            writer.write_table(keyTable)
        writer.done_writing()
    print("Lookup keys sent")

# Define a function that reads the lookup results and prints the total number of result rows.
def getLookupResults():
    resultsTable = lookupReader.read_all()
    print(resultsTable.num_rows)

# Start a timer
time_start = time.perf_counter()

# Connect to the Flight server, where <namespace> is your Cloud Pak for Data instance namespace
host = 'internal-nginx-svc.<namespace>.svc'
port = '12443'
readClient = flight.FlightClient(
    "grpc+tls://" + host + ":" + port,
    override_hostname=host,
    disable_server_verification=True)
token = 'Bearer {}'.format(wslib.auth.get_current_token())
authenticateClient(token)

# Get the lookup keys
keyInfo, keyTables = getLookupKeys()

# Start a lookup exchange
keyWriter, lookupReader = startLookup()
print("Received writer and reader")

# Start a thread to write the lookup keys
writeThread = threading.Thread(target=sendLookupKeys, args=(keyInfo, keyWriter, keyTables))
writeThread.start()

# Start a thread to read the lookup results
readThread = threading.Thread(target=getLookupResults, args=())
readThread.start()

# Wait for the threads to finish
writeThread.join()
readThread.join()

# Stop the timer
time_stop = time.perf_counter()
print(f"Total time {time_stop - time_start:0.4f} seconds")

Learn more

Parent topic: Using the Flight service in your applications