Apache Spark での Flight の使用

ノートブックで Spark を使用している場合は、 Spark データ・ソース com.ibm.connect.spark.flight を使用して、データを Spark データ・フレームにロードできます。

options = {
    "flight.location": <flight_service_url>,
    "flight.command": <flight_cmd>,
    "flight.timeout.default": "30S",
    "flight.authToken": <authentication_token>
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

前のセクションで説明した itc_utils ライブラリーの関数を使用して、ご使用の環境でオプションの値を定義できます。 例:


import itc_utils.flight_service as itcfs

MyConnection_data_request = {
      'connection_name': """MyConnection""",
      'interaction_properties': {
          'row_limit': 5000,
          'schema_name': '<schema>',
          'table_name': '<table>'
      }
 }

options = {
    "flight.location": itcfs.get_flight_service_url(),
    "flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
    "flight.timeout.default": "30S",
    "flight.authToken": itcfs.get_bearer_token()
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

itc_utils ライブラリーの使用について詳しくは、 独自のコードでの itc_utils の使用 を参照してください。

フライト要求について詳しくは、 フライト・データ要求 を参照してください。

あるいは、接続タイプがサポートされている場合は、「コード・スニペット」ペインから生成されたコードを使用して、 Flight データ・ソースのコードを生成できます。 次の例は、 Db2 Warehouse 接続からのロードを示しています。 読み取りオプションは 1 つずつ指定されることに注意してください。 これは、例えば .options(**options) のように、ディクショナリーを渡すことと同等です。

import itc_utils.flight_service as itcfs

from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()

#  Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
    'connection_name': """MyConnection""",
    'interaction_properties': {
        'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
    }
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)

sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
    .option("flight.location", itcfs.get_flight_service_url()) \
    .option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
    .option("flight.authToken", itcfs.get_bearer_token()) \
    .load()
sp_df_1.show(10)

Spark 使用時のタイムアウト

すべての Flight 呼び出しのデフォルトのタイムアウトに加えて、 Spark Flight データ・ソースは以下のタイムアウトの構成を受け入れることができます。

  • flight.timeout.auth: クライアント認証を完了するためのタイムアウト
  • flight.timeout.get: Flight ストリームを読み取るための DoGet 呼び出しを完了するまでのタイムアウト
  • flight.timeout.put: Flight ストリームを書き込むための DoPut 呼び出しを完了するまでのタイムアウト

これらのタイムアウトのいずれかが設定されていない場合、タイムアウトは flight.timeout.default で設定された値にフォールバックします。 この値も設定されていない場合、タイムアウトはありません。

これらのタイムアウトのストリング値は、秒単位 ("30S", など)、分単位 ("5M", など)、または時間単位 ("1H" など) で入力できます。

詳細情報