IBM Cloud Pak® for Data 4.7 版本将于 2025 年 7 月 31 日结束支持(EOS)。 欲了解更多信息,请参阅 IBM Cloud Pak for Data 版本 4.X 的停止服务公告。
在 IBM Cloud Pak for Data 4.7 版本支持结束之前,升级到 IBM Software Hub 5.1 版本。 更多信息,请参阅 IBM Software Hub 版本 5.1 文档中的升级 IBM Software Hub。
使用 Python 的 Flight service 示例
您可以使用 Flight 服务和 Apache Arrow Flight 协议,在项目或空间中使用 Python 读写数据。
要使用 Python读取或写入数据,您可以:
使用为您生成的代码从 Python Notebook 中的所选项目资产装入数据。 此生成的代码使用
itc_utils库,该库打包对开放式源代码 Pythonpyarrow库的调用,从而提高代码可读性,同时减少代码大小。使用开放式源代码 Python
pyarrow库来调用 Flight service,编写您自己的代码以在项目或空间中读写数据。 您需要在以下情况下编写自己的代码:- 生成的代码需要更改,例如在生产环境中使用
- 用于添加生成的代码的功能不可用于资产
- 该工具不支持添加生成的代码
以下部分提供了有关如何使用飞行数据请求为读请求指定数据源和交互式属性的更多详细信息。
有关数据请求语法和属性的详细信息,请参阅 飞行数据请求。
飞行数据请求示例
要访问数据,您需要使用包含要读取或写入的资产描述的 JSON 文档来创建数据请求对象。 在 Python中,可以使用字典来编写这些 JSON 文档。
您可以使用以下数据请求示例来了解如何在 Python中指定数据读取或写入请求的数据源和交互式属性。 这些属性因数据源而异。
模式和表请求的示例:
data_request = {
"asset_id": "ASSET_ID", # asset_id must point to a connection asset
"project_id|space_id|catalog_id": "ID",
"num_partitions": NUM, # Optional
"batch_size": SIZE, # Optional
"interaction_properties": {
"schema_name": "schema",
"table_name": "table"
},
"fields": "fields" # Optional
}
SQL 请求的示例:
data_request = {
"asset_id": "ASSET_ID", # asset_id must point to a connection asset
"project_id|space_id|catalog_id": "ID",
"num_partitions": NUM, # Optional
"batch_size": SIZE, # Optional
"interaction_properties": {
"select_statement": "SELECT ... FROM <schema>.<table> WHERE ..."
}
}
从数据资产文件 (例如 Parquet 文件) 中读取的请求示例:
data_request = {
"asset_id": "ASSET_ID", # asset_id must point to a connected data asset
"project_id|space_id|catalog_id": "ID",
"num_partitions": NUM, # Optional
"fields": "fields" # Optional
}
从 Parquet 文件读取请求的示例:
data_request = {
"asset_id": "ASSET_ID",
"project_id|space_id|catalog_id": "ID",
"num_partitions": NUM, # Optional
"batch_size": SIZE, # Optional
"interaction_properties": {
"folder": "FOLDER",
"file": "FILE",
"bucket": "BUCKET", # If using COS specify bucket in addition to folder/file
"file_format": "parquet"
},
"fields": "fields" # Optional
}
向连接传递连接属性的请求的示例:
data_request = {
"datasource_type": "dashdb",
"connection_properties": {
"database": "DBNAME",
"password": "PASSWORD",
"port": NUM,
"host": "HOSTNAME",
"ssl": true,
"username": "CONNUSER"
},
"num_partitions": NUM, # Optional
"batch_size": SIZE, # Optional
"interaction_properties": {
"schema_name": "schema",
"table_name": "table",
}
"fields": "fields" # Optional
}
"connection_properties" 和 "interaction_properties" 属性因连接器而异。 详见 IBM Watson Data API 数据源类型。
使用飞行数据请求的代码样本
以下代码样本显示了如何在 Python Notebook 中调用 Flight service 。 这些样本说明了数据请求之间的差异,具体取决于数据源连接。
显示如何调用 Flight service 的样本使用名为 itc_utils的 Python 库,该库由 IBM 提供以在 Notebook 中使用。 有关更多详细信息,请参阅 在 Python Notebook 中使用 Flight service。
为 HTTP 连接加载数据而生成的代码中返回的数据请求示例:
import itc_utils.flight_service as itcfs
readClient = itcfs.get_flight_client()
HTTP_data_request = {
'connection_name': """HTTP""",
'interaction_properties': {
'infer_schema': 'true'
}
}
flightInfo = itcfs.get_flight_info(readClient, nb_data_request=HTTP_data_request)
data_df_1 = itcfs.read_pandas_and_concat(readClient, flightInfo)
data_df_1.head(10)
使用连接标识和无属性的数据请求的示例:
import itc_utils.flight_service as itcfs
readClient = itcfs.get_flight_client()
data_request = {
'project_id': 'c01a0212-47cc-4cd4-8cc8-1dd92dbb4bde',
'asset_id': 'a6c0117c-e6da-4d71-9da4-659edcec7ba5',
'interaction_properties': {
'infer_schema': 'true'
}
}
flightInfo = itcfs.get_flight_info(readClient, data_request=data_request)
data_df_1 = itcfs.read_pandas_and_concat(readClient, flightInfo)
data_df_1.head(10)
使用属性的 Db2 连接的数据请求示例:
import itc_utils.flight_service as itcfs
readClient = itcfs.get_flight_client()
data_request = {'datasource_type': {'entity': {'name': 'db2'}},
'connection_properties': {'database': 'BLUDB',
'password': '****',
'username_password_security': 'clear_text',
'port': '50001',
'host': 'dashdb-123.services.dal.bluemix.net',
'inherit_access_token': 'false',
'username_password_encryption': 'default',
'ssl': 'true',
'username': 'myuser'},
'interaction_properties': {'schema_name': 'MYSCHEMA', 'table_name': 'A_TABLE'}
}
flightInfo = itcfs.get_flight_info(readClient, data_request=data_request)
data_df_1 = itcfs.read_pandas_and_concat(readClient, flightInfo)
data_df_1.head(10)