Running Spark application

You can run Spark use cases for watsonx.data by using Python samples. All the samples are written by using Spark Python APIs.

Before you begin

Before you can run Spark use cases for watsonx.data, you must:

  • Install IBM watsonx.data instance.
  • Establish connection with Spark .
  • To enable your Spark application to work with the watsonx.data catalog and storage, you must have Metastore admin role. Without Metastore admin privilege, you cannot ingest data to storage using Native Spark engine. For more information about the Spark configuration, see Working with the watsonx.data catalog and storage.

About this task

About the sample use case
The sample file demonstrates the following functions:
  • Accessing tables from watsonx.data

    : The Create a database in Lakehouse catalog section from the sample use case creates a database demodb in the configured watsonx.data instance with a catalog named lakehouse. demodb is configured to store all the data and metadata under the Cloud Object Storage bucket lakehouse-bucket. It also creates an iceberg table testTable and accesses it.

  • Ingesting data to watsonx.data

    : The Ingest parquet data into a lakehouse table section from the sample use case allows you to ingest data in parquet and CSV format from a spark-vol into a watsonx.data table. Sample data in parquet format is inserted from the volume into the watsonx.data table yellow_taxi_2022 . The Ingest parquet data into a lakehouse table section from the sample python file also shows ingesting data in CSV format from the spark-vol into the table zipcode in the database demodb. For more information, see Inserting sample data into the Cloud Object Storage.

  • Modifying the schema in watsonx.data

    : The Schema evolution section from the sample use case allows you to modify data in watsonx.data.

  • Table maintenance activities in watsonx.data

    : Table maintenance helps in keeping the watsonx.data table performance. Iceberg provides table maintenance procedures out of the box that allows performing powerful table optimizations in a declarative fashion. The following sample demonstrates how to do some table maintenance operations by using Spark. For more information about the Iceberg Spark table maintenance operations, see Table Operations.

To run Spark use cases for watsonx.data by using sample Python files:

Procedure

  1. To insert data to Cloud Object Storage, for the following steps.
    Inserting sample data into the Cloud Object Storage
    1. Create a Cloud Object Storage (for example, source-bucket) to store sample data to be ingested into watsonx.data instance. For information about creating storage volume, see Getting started with IBM Cloud Object Storage.

      As a user of Object Storage (storage volume) , you not only need to know the API key or the HMAC keys to configure Object Storage, but also the IBM Analytics Engine service endpoints to connect to Object Storage. See Selecting regions and endpoints for more information on the endpoints to use based on your Object Storage bucket type, such as regional versus cross-regional. You can also view the endpoints across regions for your Object Storage service by selecting the service on your IBM Cloud dashboard and clicking Endpoint in the navigation pane.

    2. Download sample CSV file (for example, zipcodes.csv) and parquet sample data (for example, six months taxi data for year 2022) from the following links.

      - Sample parquet file

      - Sample CSV file

    3. Install IBM Cloud Object Storage plug-in. For more information about how to install plug-in, see IBM Cloud Object Storage CLI.

    4. Use the COS CLI to upload the sample data into Cloud Object Storage bucket.
      ibmcloud cos upload --bucket <cos_bucket_name> --key <source_file_name> --file <path_to_source_file>

      Parameter values:

      - <cos_bucket_name>: name of the storage volume.

      - <source_file_name>: the name of the sample data file that you downloaded. Here, key zipcodes.csv is the file name (see the following example).

      - <path_to_source_file>: the path to the location in your machine where the file resides. Here, path/zipcodes.csv is the file path (see the following example).

      For example:

      ibmcloud cos upload --bucket source-bucket --key zipcodes.csv --file <path/zipcodes.csv>
  2. Save the following sample as a Python file:
    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
        spark = SparkSession.builder \
            .appName("lh-hms-cloud") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \
            .enableHiveSupport() \
            .getOrCreate()
        return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframe
        df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with compacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')
    
    def main():
        try:
            spark = init_spark()
            create_database(spark)
            list_databases(spark)
            basic_iceberg_table_operations(spark)
            # demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
            # load data for the month of February to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
    main()
    Parameter values:
    • <cos_bucket_endpoint> : Provide the Metastore host value. For more information, see storage details.
    • <access_key> : Provide the access_key_id. For more information, see storage details.
    • <secret_key> : Provide the secret_access_key. For more information, see storage details.
  3. Upload the Python file to the Cloud Object Storage (You application file reside in Cloud Object Storage). For more information about uploading data, see Upload data.
  4. Generate the API authorization token for the IBM Analytics Engine. For more information about how to generate the token, see Generating an API authorization token.
  5. Run the following curl command to submit the Spark application.
    curl -k -X POST <V4_JOBS_API_ENDPOINT> -H "Authorization: ZenApiKey ${TOKEN}" -d '{
    {
      "application_details": {
        "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<OBJECT_NAME>",
        "arguments": [
          "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<OBJECT_NAME>"
        ],
        "class": "<main_class>",
        "conf": {
          "spark.app.name": "MyJob",
          "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
          "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
          "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
        }
      }
    }
    Parameter values:
    • <V4_JOBS_API_ENDPOINT>: The endpoint for the instance that you want to use to submit your Spark job. To get the Spark jobs endpoint for your provisioned instance, see Administering the service instance.
    • <TOKEN>: To get the access token for your service instance, see Generating an API authorization token.
    • <OBJECT_NAME>: The IBM Cloud Object Storage name.
    • <BUCKET_NAME>: The storage bucket where the application file resides.
    • <COS_SERVICE_NAME>: The Cloud object Storage service name.
    Working with the watsonx.data catalog and storage
    To enable your Spark application to work with the watsonx.data catalog and storage, add the following configuration to your application payload:
    spark.hive.metastore.client.plain.username=ibmlhapikey
    spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
    spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)