Getting started with watsonx.data Spark use cases
You can run Spark use cases for watsonx.data by using Python samples. All the samples are written by using Spark Python APIs.
Before you begin
Before you can run Spark use cases for watsonx.data, you must:
- Provision an IBM watsonx.data instance based on the environment (watsonx.data on CPD, watsonx.data on Cloud) you choose.
- Provision an IBM Analytics Engine instance in a Cloud Pak for Data cluster. See Provisioning an instance.
- Create a volume in Cloud Pak for Data. See Create a Cloud Pak for Data storage volume.
About the sample use case
The sample file demonstrates the following functions:
Accessing tables from watsonx.data
: The Create a database in Lakehouse catalog section from the sample python file creates a database demodb in the configured watsonx.data instance with a catalog named lakehouse.
demodb is configured to store all the data and metadata under the Cloud Object Storage bucket lakehouse-bucket. It also creates an iceberg table testTable and accesses it.
Ingesting data to watsonx.data
: The Ingest parquet data into a lakehouse table section from the sample python file allows you to ingest data in parquet and CSV format from a spark-vol into a watsonx.data
table. Sample data in parquet format is inserted from the volume into the watsonx.data table yellow_taxi_2022 (downloaded in step 2). The Ingest parquet data into a lakehouse table section from the sample python file also shows ingesting data in CSV format from the spark-vol into the table zipcode in the database demodb. For instructions on how to access the sample
data and upload it into the volume, see Inserting sample data into the Cloud Pak for Data volume.
Modifying the schema in watsonx.data
: The Schema evolution section from the sample python file allows you to modify data in watsonx.data
Table maintenance activities in watsonx.data
: Table maintenance helps in keeping the watsonx.data table performant. Iceberg provides table maintenance procedures out of the box that allows performing powerful table optimizations in a declarative fashion. The following sample
demonstrates how to do some table maintenance operations by using Spark.
For more information about the Iceberg Spark table maintenance operations, see Table Operations.
Running the sample use case
To run Spark use cases for watsonx.data by using sample Python files:
-
Save the following sample as a Python file:
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder \ .appName("lh-hms-cloud") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \ .enableHiveSupport() \ .getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.demodb.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.demodb.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.demodb.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.demodb.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.demodb.testTable purge') spark.sql('drop table if exists lakehouse.demodb.zipcodes purge') spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.demodb cascade') def main(): try: spark = init_spark() create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database clean_database(spark) spark.stop() if __name__ == '__main__': main()
-
Upload the Python file to the cpd-volume.
-
Generate an access token. See Generating an API authorization token.
-
Run the following curl command to submit the Spark application. For more information about submitting a Spark application, see Spark jobs API syntax, parameters, and return codes.
curl -k -X POST https://<cpd-url>/v4/analytics_engines/<intstnce id>/spark_applications -H "Authorization: ZenApiKey ${TOKEN}" -X POST -d '{ "application_details": { "application": "/spark-vol/<pythonfile-name>.py", "conf": { "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar" } }, "volumes": [{ "name": "cpd-instance::<volume-name>", "mount_path": "/spark-vol/", "source_sub_path": "spark" }] }'
Example:
curl -k -X POST https://cpd-cpd-instance.apps.spark-cpd-test.cp.fyre.ibm.com/v4/analytics_engines/14c3c133-ba87-406c-a06d-16157d969d5b/spark_applications -H "Authorization: ZenApiKey eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IllZR1FJWkpoX3dYSWRrTXNqMV95dU5qR3VUeW1LT0xYcU1UN1UxSmxFUk0ifQ.eyJ1c2VybmFtZSI6ImFkbWluIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtb25pdG9yX3BsYXRmb3JtIiwiY29uZmlndXJlX3BsYXRmb3JtIiwidmlld19wbGF0Zm9ybV9oZWFsdGgiLCJjb25maWd1cmVfYXV0aCIsIm1hbmFnZV91c2VycyIsIm1hbmFnZV9ncm91cHMiLCJtYW5hZ2Vfc2VydmljZV9pbnN0YW5jZXMiXSwiZ3JvdXBzIjpbMTAwMDBdLCJzdWIiOiJhZG1pbiIsImlzcyI6IktOT1hTU08iLCJhdWQiOiJEU1giLCJ1aWQiOiIxMDAwMzMwOTk5IiwiYXV0aGVudGljYXRvciI6ImRlZmF1bHQiLCJkaXNwbGF5X25hbWUiOiJhZG1pbiIsImFwaV9yZXF1ZXN0IjpmYWxzZSwiaWF0IjoxNjkwOTcyNDgwLCJleHAiOjE2OTEwMTU2ODB9.ALFvnY7cYNENfMpPRncxcKXCkt4F_xy8sI50DoQgtu_oSFB9fbstrUMe8XbNwermM-DCpC64XGjJdZaYeHXumCaelPETeQdqQnYRWONvyr18-y5Xn4GZlSvzzmiRvhzGZ-bt5cLsC_iEftouS_Opm_wnfaMUjYIPjVEzjFBTQh6PDMo7Q79kdZIyeZbHCKdKYVUpaZgHWKvZqMtNGGnm9PydN5TIRMws81817l96a7LWkI4UZYgybX53iNU0Mun_4iYqEizQm_dmII0_I7YkA1bBQuVtKcRFqfG5MucAR0REz3k9sBIXzn_qTFFrN_lQr2oSnWHviLUyAy1UCaHQNw" -X POST -d '{ "application_details": { "application": "/spark-vol/test-cpdspark.py", "conf": { "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar", } }, "volumes": [{ "name": "cpd-instance::spark-artifacts-vol", "mount_path": "/spark-vol/", "source_sub_path": "spark" }] }'
Inserting sample data into the Cloud Pak for Data volume
To insert data into the Cloud Pak for Data volume:
-
Create a volume, named spark-vol, to store sample data to be ingested into the watsonx.data instance. For information on creating a volume, see Managing storage volumes.
-
Download the following parquet sample data and sample CSV file:
- Sample parquet file (sample data for six months of taxi data for year 2022)
- Sample CSV file (zipcodes.csv)
-
Upload the sample data into the spark-vol volume. For instructions, see Browse the storage volume and upload content.
Parent topic: Configuring an Analytics Engine powered by Apache Spark instance for watsonx.data