使用 R 语言进行 Flight service 数据请求的示例

您可以使用 Flight service 以及 Apache Arrow Flight 协议,在项目或空间中读取和写入数据。 在 R 中,您可以通过调用开源的 ` Pythonpyarrow ` 包,并借助 R 的 `reticulate` 包来调用 ` Flight service ` 函数。 该库为 Python 的模块、类和函数提供了R接口。

要使用 R 读取或写入数据,您可以:

  • 使用系统为您生成的代码,将选定项目资产中的数据加载到笔记本中。 此生成的代码使用了 库 itc_utils ,该库封装了对开源库 Pythonpyarrow 的调用,在提高代码可读性的同时缩减了代码体积。

  • 编写自己的代码,利用开源的 Pythonpyarrow 库调用 Flight service ,在项目或空间中读取和写入数据。 在以下情况下,您需要编写自己的代码:

    • 生成的代码需要进行修改,例如以便在生产环境中使用
    • 该资产不支持添加生成的代码
    • 该工具不支持添加生成的代码

有关从 R 调用 Python 的详细信息,请参阅:

有关开源 Flight Arrow R 中的 Flight 客户端 API 的详细信息,请参阅《 连接到 Flight RPC 服务器 》。

以下各节将详细介绍如何使用飞行数据请求为读取或写入请求指定数据源和交互属性。

有关数据请求语法和属性的详细信息,请参阅 “航班数据请求 ”。

您可以参考以下数据请求示例,了解如何在 R 中为数据读取或写入请求指定数据源和交互属性。 这些属性会因数据源的不同而有所差异。 这些示例使用 ` Pythonreticulate ` 来创建字典。

使用连接 ID 查询架构和表的示例(仅适用于未集成 Git 的项目)

data_request = dict(
    "asset_id"= "ASSET_ID",
    "project_id|space_id|catalog_id" = "ID",
    "num_partitions"= NUM, # Optional
    "batch_size"= SIZE, # Optional
    "interaction_properties"= dict(
        "schema_name"= "SCHEMA",
        "table_name"= "TABLE"
        ),
    "fields"= list("FIELDS") # Optional
    )

使用连接名称请求模式和表的示例

data_request = dict(
    "connection_name"= "CONNECTION",
    "batch_size"= NUM, # Optional
    "interaction_properties"= dict(
        "schema_name"= "SCHEMA",
        "table_name"= "TABLE",
        "row_limit"= NUM
        ),
    "fields"= list("FIELDS") # Optional
    )

使用连接 ID 的 SQL 请求示例(仅适用于未集成 Git 的项目)

data_request = dict(
    "asset_id"= "ASSET_ID",
    "project_id|space_id|catalog_id" = "ID",
    "num_partitions"= NUM, # Optional
    "batch_size"= SIZE, # Optional
    "interaction_properties"= dict(
        "select_statement"= "SQL" )
    )

使用连接名称的 SQL 请求示例

data_request = dict(
    "connection_name"= "CONNECTION",
    "num_partitions"= NUM, # Optional
    "batch_size"= SIZE, # Optional
    "interaction_properties"= dict(
        "select_statement"= "SELECT ... FROM <schema>.<table> WHERE ..."
        )
    )

使用资产 ID 请求数据文件的示例(仅适用于未集成 Git 的项目)

data_request = dict(
    "asset_id"= "ASSET_ID",
    "project_id|space_id|catalog_id" = "ID",
    "num_partitions"= NUM, # Optional
    )

使用资产名称请求数据资产的示例

data_request = dict(
    "data_name"= "DATA ASSET",
    "interaction_properties"= dict(
        "infer_schema"= "true",
        "infer_as_varchar"= "false"
        )
    )

"interaction_properties" 属性的 "connection_properties" 具体表现因连接器而异。 详情请参阅 “数据与 AI 通用核心 API —— 数据源类型列表 ”。

使用飞行数据请求的代码示例

以下代码示例展示了如何在 R 笔记本中调用 Flight service。 这些示例使用了 R 语言库 reticulate 以及 Python 上的 和 pyarrowitc_utils 库。 itc_utils 已预装在 IBM 提供的所有笔记本和 RStudio 运行时环境中,供这些环境使用。 有关更多详细信息,请参阅 《在 R 笔记本中使用 Flight service 》

示例:从连接中读取模式中某张表的数据,该连接是通过连接名访问的

# Schema and Table example using connection name
library("reticulate")
pa <- import("pyarrow")
library("arrow")

itcfs <- import("itc_utils.flight_service")
client <- itcfs$get_flight_client()

data_request = dict(
"connection_name"= "CONNECTION",
"batch_size"= NUM, # Optional
"interaction_properties"= dict(
    "schema_name"= "SCHEMA",
    "table_name"= "TABLE",
    "row_limit"= NUM
    ),
"fields"= list("FIELDS") # Optional
)

flightInfo <- itcfs$get_flight_info(client, data_request=data_request)
df <- as.data.frame(itcfs$read_tables(client, flightInfo, timeout=240))
head(df)

通过连接名访问 SQL 连接并读取数据的示例

library("reticulate")
pa <- import("pyarrow")
library("arrow")

itcfs <- import("itc_utils.flight_service")
client <- itcfs$get_flight_client()

data_request = dict(
"connection_name"= "CONNECTION",
"num_partitions"= NUM, # Optional
"batch_size"= SIZE, # Optional
"interaction_properties"= dict(
    "select_statement"= "SELECT * FROM SCHEMA.TABLE WHERE FIELD = 'field'"
    )
)

flightInfo <- itcfs$get_flight_info(client, nb_data_request=data_request)
df <- as.data.frame(itcfs$read_tables(client, flightInfo, timeout=240))
head(df)

以下代码片段展示了在将数据写回数据源时如何使用自定义数据请求的示例:

向现有数据库表写入数据的示例,若表不存在则创建新表:

library("reticulate")
pa <- import("pyarrow")
library("arrow")

itcfs <- import("itc_utils.flight_service")
client <- itcfs$get_flight_client()

data_request = dict(
"connection_name"= "CONNECTION",
"interaction_properties"= dict(
    "schema_name"= "SCHEMA",
    "table_name"= "TABLE"
    )
)

itcfs$write_dataframe(DATA, nb_data_request = data_request)

替换现有表中所有数据的示例:

library("reticulate")
pa <- import("pyarrow")
library("arrow")

itcfs <- import("itc_utils.flight_service")
client <- itcfs$get_flight_client()

data_request = dict(
    "connection_name"= "CONNECTION",
    "interaction_properties"= dict(
        "schema_name"= "SCHEMA",
        "table_name"= "TABLE",
        "table_action"= "truncate",
        "write_mode"= "insert"
        )
)

itcfs$write_dataframe(DATA, nb_data_request = data_request)

了解更多