Como usar o Flight com o Apache Spark
Se você estiver usando o Spark no seu notebook, pode utilizar a fonte de dados com.ibm.connect.spark.flight do Spark para carregar dados em um dataframe do Spark:
options = {
"flight.location": <flight_service_url>,
"flight.command": <flight_cmd>,
"flight.timeout.default": "30S",
"flight.authToken": <authentication_token>
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
Você pode definir os valores das opções em seu ambiente utilizando as funções da itc_utils biblioteca descritas nas seções anteriores. Por exemplo:
import itc_utils.flight_service as itcfs
MyConnection_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'row_limit': 5000,
'schema_name': '<schema>',
'table_name': '<table>'
}
}
options = {
"flight.location": itcfs.get_flight_service_url(),
"flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
"flight.timeout.default": "30S",
"flight.authToken": itcfs.get_bearer_token()
}
df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()
Para obter mais detalhes sobre como usar a itc_utils biblioteca, consulte “Usando o itc_utils com seu próprio código ”.
Para obter mais informações sobre solicitações de voos, consulte Solicitações de dados de voos.
Como alternativa, se o seu tipo de conexão for compatível, você pode usar o código gerado no painel “Trechos de código” para gerar código para a fonte de dados Flight. O exemplo a seguir mostra como carregar dados a partir de uma conexão Db2 Warehouse. Observe que as opções de leitura são especificadas uma a uma. Isso equivale a passar um dicionário, como, .options(**options) por exemplo.
import itc_utils.flight_service as itcfs
from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()
# Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
'connection_name': """MyConnection""",
'interaction_properties': {
'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
}
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)
sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
.option("flight.location", itcfs.get_flight_service_url()) \
.option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
.option("flight.authToken", itcfs.get_bearer_token()) \
.load()
sp_df_1.show(10)
Temporizações ao usar o Spark
Além do tempo limite padrão para todas as chamadas do Flight, a fonte de dados do Spark Flight pode aceitar configurações para os seguintes tempos limite:
flight.timeout.auth: Tempo limite para concluir a autenticação do clienteflight.timeout.get: Tempo limite para concluir uma chamada ` DoGet ` para ler um fluxo `Flight`flight.timeout.put: Tempo limite para concluir uma chamada ` DoPut ` para gravar um fluxo `Flight`
Se algum desses tempos limite não estiver definido, o tempo limite será ajustado para o valor definido em flight.timeout.default. Se esse valor também não for definido, não haverá tempo limite.
Você pode inserir os valores desses tempos limite em segundos, por exemplo, "30S",; em minutos, por exemplo, "5M",; ou em horas, por exemplo, "1H".