IBM Cloud Pak® for Data 4.8 版本将于 2025 年 7 月 31 日结束支持(EOS)。 欲了解更多信息,请参阅 IBM Cloud Pak for Data 版本 4.X 的停止服务公告。
在 版本支持结束之前,升级到 版本。 IBM Cloud Pak for Data 4.8 IBM Software Hub 5.1 有关更多信息,请参阅从 IBM Cloud Pak for Data 版本 4.8 升级到 IBM Software Hub 版本 5.1。
使用 Flight service 以及 itc_utils 和 pandas 读写数据的示例
您可以在项目或空间中使用 itc_utils 和 pandas ,使用 Flight 服务和 Apache Arrow Flight 协议读写数据。
您可以使用以下示例来了解如何使用 itc_utils 和 pandas为数据 read 或 write 请求指定数据源和交互式属性。 这些属性因数据源而异。
从 Db2 中读取的示例
import itc_utils.flight_service as fs
# NOTE: Named schemas, tables, and columns must match the case of your database.
# You will see errors if you do not match case.
# reads data from named schema and table, row_limit is optional
db2_read = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
'row_limit': 1000,
}
}
# retrieves data from an SQL statement
# SQL statements are always case-insensitive
db2_query = {
'connection_name': 'your_connection',
'interaction_properties': {
'select_statement': 'select * from schema.table where a = 6 and b is null',
}
}
readClient = fs.get_flight_client()
# replace db2_xxxxx with one of the request options above
flightInfo = fs.get_flight_info(readClient, nb_data_request=db2_xxxxx)
df = fs.read_pandas_and_concat(readClient, flightInfo, timeout=240)
df.info()
写入 Db2 的示例
对于 write 函数,缺省值为 'table_action': 'append', 'write_mode': 'insert'。 您可以选择是否显式添加这些内容。
# inserts data to existing table or creates a new table if none exists
db2_insert = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
}
}
# replaces all data in existing table
db2_replace = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
'table_action': 'truncate',
'write_mode': 'insert',
}
}
# inserts new data or updates existing data depending on matching key(s)
db2_upsert = {
'connection_name': 'your_connection',
'interaction_properties': {
'schema_name': 'YOUR_SCHEMA',
'table_name': 'YOUR_TABLE',
'table_action': 'append',
'write_mode': 'merge',
'key_column_names': 'COLUMN_A,COLUMN_B',
}
}
# write a pandas dataframe to your Db2
# be sure your column case matches the database
df.columns = df.columns.str.upper()
# replace db2_xxxxx with one of the three write options above
fs.write_dataframe(df, data_request=fs.get_data_request(nb_data_request=db2_xxxxx))
使用 do_action 的示例
调用 do_action 将立即执行操作。 返回的生成器将接收操作的结果,并在操作完成后进行填充。 客户机必须使用整个生成器才能完成操作。 结果可能因操作而异。 如果未返回任何错误,那么操作将成功。 您可以调用 list_actions() 以获取可用操作的列表。
调用 flight_client.list_actions() 将返回以下可用操作类型:
discovery对给定的连接器请求执行发现。
返回针对请求的数据源发现的元数据。
setup_phase运行给定连接器请求的设置阶段,该请求用于准备分区以写入数据或运行静态 SQL 语句。
返回包含所请求数据目标的资源标识,分区信息和模式的设置阶段响应消息。
wrapup_phase运行给定连接器请求的总结阶段,此阶段在将所有分区写入连接器之后使用。
返回空结果。
test测试给定连接器请求的资源,属性和连接。
返回连接器资源键。
validate验证给定连接器请求的资源和属性,而不建立连接。
返回连接器资源键。
health_check执行飞行的运行状况检查。
返回 Flight service 状态和库构建信息。
do_action 的返回值是必须完全使用才能完成操作的生成器。 您可以将此生成器转换为将使用该生成器的列表。 例如:
result = list(client.do_action(action))
以下示例使用 setup_phase 操作来运行静态 SQL 语句。
import itc_utils.flight_service as itcfs
from ibm_watson_studio_lib import access_project_or_space
import pyarrow.flight as flight
flightClient = itcfs.get_flight_client()
nb_data_request = {
'connection_name': """PostgreSQL""",
'interaction_properties': {
'static_statement': 'CREATE TABLE IF NOT EXISTS public.uno2 (id serial, x varchar (32))',
'write_mode': 'static_statement'
},
}
flight_request = itcfs.get_data_request(nb_data_request=nb_data_request)
flight_request['context']= 'target'
flight_cmd = itcfs.get_flight_cmd(data_request=flight_request)
print(flight_cmd)
action = flight.Action('setup_phase',flight_cmd.encode('utf-8'))
gen = flightClient.do_action(action)
result = list(gen) # fully consume iterator
合并示例
import itc_utils.flight_service as itcfs
table = itcfs.pa.Table.from_pandas(df_out)
write_data_request = {
'connection_name': """Generic JDBC DB2-Mak""",
'interaction_properties': {
'schema_name': 'MPS38401',
'table_name': 'BOFA_OUT',
'table_action': 'append',
'write_mode': 'merge',
'key_column_names': 'ID',
}
}
client = itcfs.get_flight_client()
flight_request = itcfs.get_data_request(nb_data_request=write_data_request)
flight_descriptor=itcfs.flight.FlightDescriptor.for_command(str(flight_request))
writer, _ = client.do_put(flight_descriptor, table.schema)
writer.write_table(table)
writer.close()
查找示例
以下示例显示如何使用 Flight do_exchange 方法执行查找。 如果源表和查找表都来自两个不同的数据源,例如,如果查找键来自保存在 Cloud Object Storage 中的 CSV 文件或 Excel 文件,但查找表位于 Db2中,那么可以使用查找。
source_command 输入与发送到具有相同受支持属性的 do_get 的命令相同。 do_exchange 方法不会返回飞行信息对象。 您只能发送单个流并接收单个流。 它不支持分区结果。 有关更多信息,请参阅 Apache Flight 文档。
支持参数化查询的数据源可以支持 lookup。 以下数据源支持 lookup:
- Amazon RDS for MySQL
- Amazon RDS for Oracle
- Amazon RDS for PostgreSQL
- Amazon Redshift
- Apache Cassandra
- Apache Derby
- Apache Hive
- Cloudera Impala
- Data Virtualization Manager for z/OS
- Databases for DataStax
- Databases for MongoDB
- Databases for MySQL
- Databases for PostgreSQL
- Db2
- Db2 on Cloud
- Db2 for i
- Db2 for z/OS
- Db2 Big SQL
- Db2 Warehouse
- Informix
- MariaDB
- Microsoft Azure SQL Database
- Microsoft SQL Server
- MongoDB
- MySQL
- Netezza (PureData System for Analytics)
- Oracle
- PostgreSQL
- Salesforce.com
- SAP ASE (以前称为 Sybase)
- SAP HANA
- SingleStoreDB
- Snowflake
- Teradata
from pyarrow import flight
from pathlib import Path
from pprint import pprint
import pandas as pd
import pyarrow as pa
import argparse
import sys
import getopt
import os
import time
import threading
class TokenClientAuthHandler(flight.ClientAuthHandler):
def __init__(self, token):
super().__init__()
strToken = str(token)
self.token = strToken.encode('utf-8')
def authenticate(self, outgoing, incoming):
outgoing.write(self.token)
self.token = incoming.read()
def get_token(self):
return self.token
# Define a function that sends a given authorization token to the FlightClient for authentication.
def authenticateClient(token):
readClient.authenticate(TokenClientAuthHandler(token),
options=flight.FlightCallOptions(timeout=5.0))
# Define a function that retrieves the order numbers for any orders placed by fax or telephone, i.e. WHERE ORDER_METHOD_CODE is 1 or 2.
def getLookupKeys():
txt = '{ \
"asset_id": "048fa8bf-80ef-417a-94ef-abcfb248fd69", \
"interaction_properties": {"select_statement":"SELECT ORDER_NUMBER FROM GOSALES_1021.ORDER_HEADER WHERE ORDER_METHOD_CODE IN (1,2)"}, \
"project_id": "f1c26626-b68a-47ee-a9ba-4d9f5bc28dc3" \
}'
print(txt)
keyInfo = readClient.get_flight_info(flight.FlightDescriptor.for_command(txt))
print(keyInfo.schema)
keyTables = []
for endpoint in keyInfo.endpoints:
print(endpoint.ticket)
reader = readClient.do_get(endpoint.ticket)
keyTable = reader.read_all()
print(keyTable.num_rows)
keyTables.append(keyTable)
return keyInfo, keyTables
# Define a function that starts an exchange to lookup order details and returns a FlightStreamWriter and a FlightStreamReader.
def startLookup():
txt = '{ \
"source_command": { \
"asset_id": "048fa8bf-80ef-417a-94ef-abcfb248fd69", \
"interaction_properties": {"schema_name":"GOSALES_1021","table_name":"ORDER_DETAILS"}, \
"project_id": "f1c26626-b68a-47ee-a9ba-4d9f5bc28dc3" \
} \
}'
print(txt)
return readClient.do_exchange(flight.FlightDescriptor.for_command(txt))
# Define a function that sends tables of lookup keys to the given exchange writer.
def sendLookupKeys(keyInfo, writer, keyTables):
print("Sending lookup keys...")
with writer:
writer.begin(keyInfo.schema)
for keyTable in keyTables:
writer.write_table(keyTable)
writer.done_writing()
print("Lookup keys sent")
# Define a function that reads the lookup results and prints the total number of result rows.
def getLookupResults():
resultsTable = lookupReader.read_all()
print(resultsTable.num_rows)
# Start a timer
time_start = time.perf_counter()
# Connect to the Flight server, where <namespace> is your Cloud Pak for Data instance namespace
host = 'internal-nginx-svc.<namespace>.svc'
port = '12443'
readClient = flight.FlightClient(
"grpc+tls://" + host + ":" + port,
override_hostname=host,
disable_server_verification=True)
token = 'Bearer {}'.format(wslib.auth.get_current_token())
authenticateClient(token)
# Get the lookup keys
keyInfo, keyTables = getLookupKeys()
# Start a lookup exchange
keyWriter, lookupReader = startLookup()
print("Received writer and reader")
# Start a thread to write the lookup keys
writeThread = threading.Thread(target=sendLookupKeys, args=(keyInfo, keyWriter, keyTables))
writeThread.start()
# Start a thread to read the lookup results
readThread = threading.Thread(target=getLookupResults, args=())
readThread.start()
# Wait for the threads to finish
writeThread.join()
readThread.join()
# Stop the timer
time_stop = time.perf_counter()
print(f"Total time {time_stop - time_start:0.4f} seconds")