Apache Spark での Flight の使用
ノートブックで Spark を使用している場合は、 Spark データ・ソース com.ibm.connect.spark.flight を使用して、データを Spark データ・フレームにロードできます。
options = {
"flight.location": <flight_service_url>,
"flight.command": <flight_cmd>,
"flight.timeout.default": "30S",
"flight.authToken": <authentication_token>
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
前のセクションで説明した itc_utils ライブラリーの関数を使用して、ご使用の環境でオプションの値を定義できます。 例:
import itc_utils.flight_service as itcfs
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
options = {
"flight.location": itcfs.get_flight_service_url(),
"flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
"flight.timeout.default": "30S",
"flight.authToken": itcfs.get_bearer_token()
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
itc_utils ライブラリーの使用について詳しくは、 独自のコードでの itc_utils の使用 を参照してください。
フライト要求について詳しくは、 フライト・データ要求 を参照してください。
あるいは、接続タイプがサポートされている場合は、「コード・スニペット」ペインから生成されたコードを使用して、 Flight データ・ソースのコードを生成できます。 次の例は、 Db2 Warehouse 接続からのロードを示しています。 読み取りオプションは 1 つずつ指定されることに注意してください。 これは、例えば .options(**options) のように、ディクショナリーを渡すことと同等です。
import itc_utils.flight_service as itcfs
from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()
# Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
}
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)
sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
.option("flight.location", itcfs.get_flight_service_url()) \
.option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
.option("flight.authToken", itcfs.get_bearer_token()) \
.load()
sp_df_1.show(10)
Spark 使用時のタイムアウト
すべての Flight 呼び出しのデフォルトのタイムアウトに加えて、 Spark Flight データ・ソースは以下のタイムアウトの構成を受け入れることができます。
flight.timeout.auth: クライアント認証を完了するためのタイムアウトflight.timeout.get: Flight ストリームを読み取るための DoGet 呼び出しを完了するまでのタイムアウトflight.timeout.put: Flight ストリームを書き込むための DoPut 呼び出しを完了するまでのタイムアウト
これらのタイムアウトのいずれかが設定されていない場合、タイムアウトは flight.timeout.default で設定された値にフォールバックします。 この値も設定されていない場合、タイムアウトはありません。
これらのタイムアウトのストリング値は、秒単位 ("30S", など)、分単位 ("5M", など)、または時間単位 ("1H" など) で入力できます。