Running workflow by using Apache airflow
The Extract, Transform, and Load (ETL) operations involves executing multiple Spark runtimes either sequentially or concurrently. Using Directed Acyclic Graphs (DAG), you can design a workflow to automate the process. The topic considers a use case to run an Apache Airflow workflow, which executes tasks to ingest data to Presto in watsonx.data and query data from watsonx.data.
watsonx.data on IBM Software Hub
Before you Begin
- Install an Apache Airflow stand-alone instance on your computer.
- Install watsonx.data on your computer.
- Install
PandasandPrestodblibraries. - watsonx.data console host information,
wxd_host. For more information, see Getting connection information.. - API keys for watsonx.data
(
usernameandapi_key). For more information to generate API key, see Generating an API authorization token. - Service Instance ID for watsonx.data
(
wxd_instance_id). Get the instance ID from the watsonx.data information page. - Spark engine ID of the Spark engine
spark_engine_id. For more information, see Getting connection information.. - Presto external URL from an active Presto engine
presto_ext_url. The host URL of the Presto engine. For more information, see Getting connection information.. - SSL certificate location, which is trusted by the system (if applicable).
- Catalog associated with Spark and Presto engines (
catalog_name). - Name of the storage associated with the selected catalog (
bucket_name). - Install the packages, Pandas, and Presto-python-client by using the command:
pip install pandas presto-python-client.
Procedure
- The use case considers a task to ingest data to Presto. To do that, create a Spark application
that ingests Iceberg data to the watsonx.data
catalog. Here, the sample Python file
ingestion-job.pyis considered.from pyspark.sql import SparkSession import os, sys def init_spark(): spark = SparkSession.builder.appName("ingestion-demo").enableHiveSupport().getOrCreate() return spark def create_database(spark,bucket_name,catalog): spark.sql("create database if not exists {}.demodb LOCATION 's3a://{}/demodb'".format(catalog,bucket_name)) def list_databases(spark,catalog): # list the database under lakehouse catalog spark.sql("show databases from {}".format(catalog)).show() def basic_iceberg_table_operations(spark,catalog): # demonstration: Create a basic Iceberg table, insert some data and then query table print("creating table") spark.sql("create table if not exists {}.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg".format(catalog)).show() print("table created") spark.sql("insert into {}.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)".format(catalog)) print("data inserted") spark.sql("select * from {}.demodb.testTable".format(catalog)).show() def clean_database(spark,catalog): # clean-up the demo database spark.sql("drop table if exists {}.demodb.testTable purge".format(catalog)) spark.sql("drop database if exists {}.demodb cascade".format(catalog)) def main(wxdDataBucket, wxdDataCatalog): try: spark = init_spark() create_database(spark,wxdDataBucket,wxdDataCatalog) list_databases(spark,wxdDataCatalog) basic_iceberg_table_operations(spark,wxdDataCatalog) finally: # clean-up the demo database clean_database(spark,wxdDataCatalog) spark.stop() if __name__ == '__main__': main(sys.argv[1],sys.argv[2]) - Upload the Python file
ingestion-job.pyinto a storage volume. For more information about uploading data to storage volume, see Managing storage volumes. Save the display name of the storage volume (data_volume_display_name) for future reference. - Design a DAG workflow by using Python and save the Python file to the Apache Airflow directory
location,
$AIRFLOW_HOME/dags/directory (Default value ofAIRFLOW_HOMEis set to~/airflow).The following is an example of a workflow, which executes tasks to ingest data to Presto in watsonx.data, and query data from watsonx.data.
Save the file with the following content, aswxd_pipeline.py.from datetime import timedelta, datetime from time import sleep import prestodb import pandas as pd import base64 import os # The DAG object from airflow import DAG # Operators from airflow.operators.python_operator import PythonOperator # type: ignore import requests # initializing the default arguments default_args = { 'owner': 'IBM watsonx.data', 'start_date': datetime(2024, 3, 4), 'retries': 3, 'retry_delay': timedelta(minutes=5), 'wxd_endpoint': 'https://<wxd_host>.com', # WXD Host URL 'wxd_instance_id': '234234....33', # Instance id of wxd service instance 'wxd_username': 'myuser1', # Your username 'wxd_api_key': '23sdf....sdff', # API Key from profile 'spark_engine_id': 'spark23', # Spark engine id 'data_volume_display_name': 'my_data_vol::zen', # Data Volume display name 'catalog_name': '<catalog_name>', # Catalog name where data will be ingested 'bucket_name': '<bucket_name>', # Bucket associated with catalog 'presto_ext_url': '<presto_external_url>', # Presto external URL of engine without https 'cert_location': '<presto-certificate>.cert' # Presto SSL certificate } # Instantiate a DAG object wxd_pipeline_dag = DAG('wxd_ingestion_pipeline_software', default_args=default_args, description='watsonx.data ingestion pipeline', schedule_interval=None, is_paused_upon_creation=True, catchup=False, max_active_runs=1, tags=['wxd', 'watsonx.data'] ) # Workaround: Enable if you want to disable SSL verification os.environ['NO_PROXY'] = '*' def _ingest_via_spark_engine(): try: print('ingest__via_spark_engine') url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications" auth_str = base64.b64encode(f'{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')).decode("ascii") headers = {'Content-type': 'application/json', 'Authorization': f'ZenApiKey {auth_str}', 'LhInstanceId': default_args['wxd_instance_id']} response = requests.post(url, None, { "application_details": { "conf": { "spark.executor.cores": "1", "spark.executor.memory": "1G", "spark.driver.cores": "1", "spark.driver.memory": "1G", "spark.hadoop.wxd.apikey": f"ZenApiKey {auth_str}" }, "application": "/app/ingestion-job.py", "arguments": [ default_args['bucket_name'], default_args['catalog_name'] ], }, "volumes": [ { "name": default_args['data_volume_display_name'], "mount_path": "/app" } ] } , headers=headers, verify=False) print("Response", response.content) return response.json()['application_id'] except Exception as inst: print('Error') exit print(inst) def _wait_until_job_is_complete(**context): try: print('wait_until_job_is_complete') application_id = context['task_instance'].xcom_pull(task_ids='ingest_via_spark_engine') print(application_id) while True: url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications/{application_id}" auth_str = base64.b64encode(f'{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')).decode("ascii") headers = {'Content-type': 'application/json', 'Authorization': f'ZenApiKey {auth_str}', 'LhInstanceId': default_args['wxd_instance_id']} response = requests.get(url, headers=headers, verify=False) print(response.content) data = response.json() if data['state'] == 'FINISHED': break elif data['state'] in ['STOPPED', 'FAILED', 'KILLED']: raise ValueError("Job failed: ", data) print('Job is not completed, sleeping for 10secs') sleep(10) except Exception as inst: print(inst) def _query_presto(): try: presto_pwd = base64.b64encode(f'{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')) with prestodb.dbapi.connect( host=default_args['presto_ext_url'], port=443, user=default_args['wxd_username'], catalog='tpch', schema='tiny', http_scheme='https', auth=prestodb.auth.BasicAuthentication('ibmlhapikey_<username>', presto_pwd) ) as conn: if default_args['cert_location'] != "": conn._http_session.verify = default_args['cert_location'] df = pd.read_sql_query(f"select * from {default_args['catalog_name']}.demodb.testTable limit 5", conn) with pd.option_context('display.max_rows', None, 'display.max_columns', None): print("\n", df.head()) except Exception as inst: print(inst) def start_job(): print('Validating default arguments') if 'wxd_endpoint' not in default_args: raise ValueError('wxd_endpoint is mandatory') if 'wxd_username' not in default_args: raise ValueError('wxd_username is mandatory') if 'wxd_instance_id' not in default_args: raise ValueError('wxd_instance_id is mandatory') if 'wxd_api_key' not in default_args: raise ValueError('wxd_api_key is mandatory') if 'spark_engine_id' not in default_args: raise ValueError('spark_engine_id is mandatory') start = PythonOperator(task_id='start_task', python_callable=start_job, dag=wxd_pipeline_dag) ingest_via_spark_engine = PythonOperator(task_id='ingest_via_spark_engine', python_callable=_ingest_via_spark_engine, dag=wxd_pipeline_dag) wait_until_ingestion_is_complete = PythonOperator(task_id='wait_until_ingestion_is_complete', python_callable=_wait_until_job_is_complete, dag=wxd_pipeline_dag) query_via_presto = PythonOperator(task_id='query_via_presto', python_callable=_query_presto, dag=wxd_pipeline_dag) start >> ingest_via_spark_engine >> wait_until_ingestion_is_complete >> query_via_presto - Log in to Apache Airflow.
- Search for
wxd_pipeline.pyjob, enable the DAG from Apache Airflow console page to run the workflow.