Getting started with watsonx.data Spark use cases

You can run Spark use cases for watsonx.data by using Python samples. All the samples are written by using Spark Python APIs.

Before you begin

Before you can run Spark use cases for watsonx.data, you must:

About the sample use case

The sample file demonstrates the following functions:

Accessing tables from watsonx.data
: The Create a database in Lakehouse catalog section from the sample python file creates a database demodb in the configured watsonx.data instance with a catalog named lakehouse. demodb is configured to store all the data and metadata under the Cloud Object Storage bucket lakehouse-bucket. It also creates an iceberg table testTable and accesses it.


Ingesting data to watsonx.data
: The Ingest parquet data into a lakehouse table section from the sample python file allows you to ingest data in parquet and CSV format from a spark-vol into a watsonx.data table. Sample data in parquet format is inserted from the volume into the watsonx.data table yellow_taxi_2022 (downloaded in step 2). The Ingest parquet data into a lakehouse table section from the sample python file also shows ingesting data in CSV format from the spark-vol into the table zipcode in the database demodb. For instructions on how to access the sample data and upload it into the volume, see Inserting sample data into the Cloud Pak for Data volume.


Modifying the schema in watsonx.data
: The Schema evolution section from the sample python file allows you to modify data in watsonx.data


Table maintenance activities in watsonx.data
: Table maintenance helps in keeping the watsonx.data table performant. Iceberg provides table maintenance procedures out of the box that allows performing powerful table optimizations in a declarative fashion. The following sample demonstrates how to do some table maintenance operations by using Spark.

For more information about the Iceberg Spark table maintenance operations, see Table Operations.

Running the sample use case

To run Spark use cases for watsonx.data by using sample Python files:

  1. Save the following sample as a Python file:

    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
        spark = SparkSession.builder \
            .appName("lh-hms-cloud") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \
            .enableHiveSupport() \
            .getOrCreate()
        return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')
    
    def main():
        try:
            spark = init_spark()
            create_database(spark)
            list_databases(spark)
            basic_iceberg_table_operations(spark)
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
  2. Upload the Python file to the cpd-volume.

  3. Generate an access token. See Generating an API authorization token.

  4. Run the following curl command to submit the Spark application. For more information about submitting a Spark application, see Spark jobs API syntax, parameters, and return codes.

    curl -k -X POST https://<cpd-url>/v4/analytics_engines/<intstnce id>/spark_applications -H "Authorization: ZenApiKey ${TOKEN}" -X POST -d '{
        "application_details": {
            "application": "/spark-vol/<pythonfile-name>.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar"
            }
        },
        "volumes": [{
            "name": "cpd-instance::<volume-name>",
            "mount_path": "/spark-vol/",
            "source_sub_path": "spark"
        }]
    }'
    

    Example:

    curl -k -X POST https://cpd-cpd-instance.apps.spark-cpd-test.cp.fyre.ibm.com/v4/analytics_engines/14c3c133-ba87-406c-a06d-16157d969d5b/spark_applications -H "Authorization: ZenApiKey eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IllZR1FJWkpoX3dYSWRrTXNqMV95dU5qR3VUeW1LT0xYcU1UN1UxSmxFUk0ifQ.eyJ1c2VybmFtZSI6ImFkbWluIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtb25pdG9yX3BsYXRmb3JtIiwiY29uZmlndXJlX3BsYXRmb3JtIiwidmlld19wbGF0Zm9ybV9oZWFsdGgiLCJjb25maWd1cmVfYXV0aCIsIm1hbmFnZV91c2VycyIsIm1hbmFnZV9ncm91cHMiLCJtYW5hZ2Vfc2VydmljZV9pbnN0YW5jZXMiXSwiZ3JvdXBzIjpbMTAwMDBdLCJzdWIiOiJhZG1pbiIsImlzcyI6IktOT1hTU08iLCJhdWQiOiJEU1giLCJ1aWQiOiIxMDAwMzMwOTk5IiwiYXV0aGVudGljYXRvciI6ImRlZmF1bHQiLCJkaXNwbGF5X25hbWUiOiJhZG1pbiIsImFwaV9yZXF1ZXN0IjpmYWxzZSwiaWF0IjoxNjkwOTcyNDgwLCJleHAiOjE2OTEwMTU2ODB9.ALFvnY7cYNENfMpPRncxcKXCkt4F_xy8sI50DoQgtu_oSFB9fbstrUMe8XbNwermM-DCpC64XGjJdZaYeHXumCaelPETeQdqQnYRWONvyr18-y5Xn4GZlSvzzmiRvhzGZ-bt5cLsC_iEftouS_Opm_wnfaMUjYIPjVEzjFBTQh6PDMo7Q79kdZIyeZbHCKdKYVUpaZgHWKvZqMtNGGnm9PydN5TIRMws81817l96a7LWkI4UZYgybX53iNU0Mun_4iYqEizQm_dmII0_I7YkA1bBQuVtKcRFqfG5MucAR0REz3k9sBIXzn_qTFFrN_lQr2oSnWHviLUyAy1UCaHQNw" -X POST -d '{
        "application_details": {
            "application": "/spark-vol/test-cpdspark.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
    
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark-artifacts-vol",
            "mount_path": "/spark-vol/",
            "source_sub_path": "spark"
        }]
    }'
    

Inserting sample data into the Cloud Pak for Data volume

To insert data into the Cloud Pak for Data volume:

  1. Create a volume, named spark-vol, to store sample data to be ingested into the watsonx.data instance. For information on creating a volume, see Managing storage volumes.

  2. Download the following parquet sample data and sample CSV file:

  3. Upload the sample data into the spark-vol volume. For instructions, see Browse the storage volume and upload content.

Parent topic: Configuring an Analytics Engine powered by Apache Spark instance for watsonx.data