Apache Spark 에서 Flight 사용
노트북에서 Spark를 사용하고 있다면, Spark 데이터 소스를 com.ibm.connect.spark.flight 사용하여 데이터를 Spark 데이터프레임으로 불러올 수 있습니다:
options = {
"flight.location": <flight_service_url>,
"flight.command": <flight_cmd>,
"flight.timeout.default": "30S",
"flight.authToken": <authentication_token>
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
앞서 설명한 itc_utils 라이브러리의 함수를 사용하여 환경 설정의 값을 정의할 수 있습니다. 예를 들어, 다음과 같습니다.
import itc_utils.flight_service as itcfs
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
options = {
"flight.location": itcfs.get_flight_service_url(),
"flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
"flight.timeout.default": "30S",
"flight.authToken": itcfs.get_bearer_token()
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
이 itc_utils 라이브러리 사용에 대한 자세한 내용은 ‘자신의 코드에서 itc_utils 사용하기’를 참조하십시오.
항공편 요청에 대한 자세한 내용은 ‘항공편 데이터 요청’을 참조하십시오.
또는, 사용 중인 연결 유형이 지원되는 경우 ‘코드 스니펫’ 창에서 생성된 코드를 사용하여 Flight 데이터 소스용 코드를 생성할 수 있습니다. 다음 예제는 Db2 Warehouse 연결을 통해 데이터를 불러오는 방법을 보여줍니다. 읽기 옵션은 하나씩 지정해야 한다는 점에 유의하십시오. 이는 예를 들어 다음과 .options(**options) 같이 사전(dictionary)을 전달하는 것과 같습니다.
import itc_utils.flight_service as itcfs
from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()
# Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
}
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)
sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
.option("flight.location", itcfs.get_flight_service_url()) \
.option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
.option("flight.authToken", itcfs.get_bearer_token()) \
.load()
sp_df_1.show(10)
Spark 사용 시 발생하는 타임아웃
모든 Flight 호출에 적용되는 기본 타임아웃 외에도, Spark Flight 데이터 소스는 다음 타임아웃에 대한 구성을 지원합니다:
flight.timeout.auth: 클라이언트 인증 완료까지 남은 시간flight.timeout.get: Flight 스트림을 읽기 위한 DoGet 호출을 완료하는 데 걸리는 시간 제한flight.timeout.put: Flight 스트림을 작성하기 위한 DoPut 호출을 완료하는 데 걸리는 시간 초과
이러한 타임아웃 중 하나라도 설정되지 않은 경우, 타임아웃은 에서 설정된 값으로 flight.timeout.default자동 적용됩니다. 이 값도 설정되지 않은 경우, 타임아웃이 발생하지 않습니다.
이러한 타임아웃 값은 초 단위로 입력할 수 있습니다(예: "30S", ). 분 단위로 입력할 수도 있습니다(예: "5M", ). 시간 단위로 입력할 수도 있습니다(예: "1H" ).