Como usar o Flight com o Apache Spark

Se você estiver usando o Spark no seu notebook, pode utilizar a fonte de dados com.ibm.connect.spark.flight do Spark para carregar dados em um dataframe do Spark:

options = {
    "flight.location": <flight_service_url>,
    "flight.command": <flight_cmd>,
    "flight.timeout.default": "30S",
    "flight.authToken": <authentication_token>
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

Você pode definir os valores das opções em seu ambiente utilizando as funções da itc_utils biblioteca descritas nas seções anteriores. Por exemplo:


import itc_utils.flight_service as itcfs

MyConnection_data_request = {
      'connection_name': """MyConnection""",
      'interaction_properties': {
          'row_limit': 5000,
          'schema_name': '<schema>',
          'table_name': '<table>'
      }
 }

options = {
    "flight.location": itcfs.get_flight_service_url(),
    "flight.command": itcfs.get_flight_cmd(nb_data_request=MyConnection_data_request),
    "flight.timeout.default": "30S",
    "flight.authToken": itcfs.get_bearer_token()
}

df = spark.read.format("com.ibm.connect.spark.flight").options(**options).load()

Para obter mais detalhes sobre como usar a itc_utils biblioteca, consulte “Usando o itc_utils com seu próprio código ”.

Para obter mais informações sobre solicitações de voos, consulte Solicitações de dados de voos.

Como alternativa, se o seu tipo de conexão for compatível, você pode usar o código gerado no painel “Trechos de código” para gerar código para a fonte de dados Flight. O exemplo a seguir mostra como carregar dados a partir de uma conexão Db2 Warehouse. Observe que as opções de leitura são especificadas uma a uma. Isso equivale a passar um dicionário, como, .options(**options) por exemplo.

import itc_utils.flight_service as itcfs

from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()

#  Edit select_statement to change or disable the row limit.
#
Db2Warehouse_data_request = {
    'connection_name': """MyConnection""",
    'interaction_properties': {
        'select_statement': 'SELECT * FROM "MYSCHEMA"."MYTABLE" FETCH FIRST 5000 ROWS ONLY'
    }
}
flight_request = itcfs.get_data_request(nb_data_request=Db2Warehouse_data_request)

sp_df_1 = sparkSession.read.format("com.ibm.connect.spark.flight") \
    .option("flight.location", itcfs.get_flight_service_url()) \
    .option("flight.command", itcfs.get_flight_cmd(data_request=flight_request)) \
    .option("flight.authToken", itcfs.get_bearer_token()) \
    .load()
sp_df_1.show(10)

Temporizações ao usar o Spark

Além do tempo limite padrão para todas as chamadas do Flight, a fonte de dados do Spark Flight pode aceitar configurações para os seguintes tempos limite:

  • flight.timeout.auth: Tempo limite para concluir a autenticação do cliente
  • flight.timeout.get: Tempo limite para concluir uma chamada ` DoGet ` para ler um fluxo `Flight`
  • flight.timeout.put: Tempo limite para concluir uma chamada ` DoPut ` para gravar um fluxo `Flight`

Se algum desses tempos limite não estiver definido, o tempo limite será ajustado para o valor definido em flight.timeout.default. Se esse valor também não for definido, não haverá tempo limite.

Você pode inserir os valores desses tempos limite em segundos, por exemplo, "30S",; em minutos, por exemplo, "5M",; ou em horas, por exemplo, "1H".

Saiba mais