Submitting Spark application by using native Spark engine

You can submit a Spark application by running a CURL command. Complete the following steps to submit a Python application.

Before you begin

To enable your Spark application to work with the watsonx.data catalog and storage, you must have Metastore admin role. Without Metastore admin privilege, you cannot ingest data to storage using Native Spark engine. To enable your Spark application to work with the watsonx.data catalog and storage, add the following configuration to your application payload:.
spark.hive.metastore.client.plain.username=ibmlhapikey
spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)

Procedure

  1. Create a storage volume to store the Spark application and related output.
    • Option1: Create a storage volume in Cloud Pak for Data cluster. To create storage volume in Cloud Pak for Data cluster, see Creating a storage volume.
    • Option2: Create Cloud Object Storage. To create Cloud Object Storage and a bucket, see Creating a storage bucket.
  2. If you use Cloud Object Storage, register the Cloud Object Storage in watsonx.data, register Cloud Object Storage bucket. To register Cloud Object Storage bucket, see Adding bucket catalog pair.
  3. Upload the Spark application to the storage volume.
  4. If your Spark application resides in Cloud Pak for Data storage volume, specify the parameter values and run the following CURL command to submit the application.
    curl --request POST \
      --url https://<cpd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
      --header 'Authorization: Bearer <token>' \
      --header 'Content-Type: application/json' \
      --header 'LhInstanceId: <instance_id>' \
      --data '{
      "application_details": {
        "application": "/myapp/<python file name>"
      },
      "volumes": [
        {
          "name": "cpd-instance::my-vol-1",
          "mount_path": "/myapp"
        }
      ]
    }'
    Parameter values:
    • <cpd_host_name>: The hostname of your Cloud Pak for Data cluster.
    • <spark_engine_id> : The Engine ID of the native Spark engine.
    • <token> : The bearer token. For more information about generating the token, see Generating a bearer token.
    • <instance_id> : The instance ID from the watsonx.data cluster instance URL. For example, 1609968577169454.
    • <python file name> : The Spark application file name. It must be available in the storage volume.
    • <my-vol-1> : The display name of the storage volume.
    • <python file name> : The Spark application file name.

    Example 2:

    Run the following curl command to submit the word count application.
    curl --request POST \
      --url https://<cpd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
      --header 'Authorization: Bearer <token>' \
      --header 'Content-Type: application/json' \
      --header 'LhInstanceId: <instance_id>' \
      --data '{
      "application_details": {
        "application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
        "arguments": [
    "/opt/ibm/spark/examples/src/main/resources/people.txt"
        ]
      }
    }'

    Parameters:

    • <instance_id> : The instance ID from the watsonx.data cluster instance URL. For example, 1609968577169454.
    • <cpd_host_name> : The host name of your watsonx.data cluster.
    • <spark_engine_id> : The engine ID of the Spark engine.
    • <token> : The bearer token. For more information about generating the token, see Generating a bearer token.

    Example 3:

    Run the following curl command to customize the cluster hardware sizes:

    curl -k -X POST \
    --url https://<cpd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
     -H "Authorization: ZenApiKey ${TOKEN}" -d '{
        "application_details": {
            "application": "/opt/ibm/spark/examples/jars/spark-examples*.jar",
            "arguments": ["1"],
            "class": "org.apache.spark.examples.SparkPi",
            "conf": {
                "spark.driver.memory": "4G",
                "spark.driver.cores": "1",
                "spark.executor.memory": "4G",
                "spark.executor.cores": "1",
                "ae.spark.executor.count": "1"
            }
        }
    }'
  5. If your Spark application resides in Cloud Object Storage, specify the parameter values and run the following curl command. The following example shows the command to submit read.py application.

    Example 1:

    curl --request POST \
      --url https://<cpd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
      --header 'Authorization: Bearer <token>' \
      --header 'Content-Type: application/json' \
      --header 'LhInstanceId: <instance_id>' \
      --data '{
      "application_details": {
        "application": "s3a://<s3_bucket_name>/cos-read.py",
        "conf": {
            "spark.hadoop.fs.s3a.bucket.<s3_bucket_name>.endpoint": "<cos_endpoint>",
            "spark.hadoop.fs.s3a.bucket.<s3_bucket_name>.access.key": "<s3 bucket HMAC access key>",
            "spark.hadoop.fs.s3a.bucket.<s3_bucket_name>.secret.key": "<s3 bucket  HMAC secret key>",
            "spark.app.name": "reader-app"
        }
      }
    }'
    Parameter values:
    • <cpd_host_name>: The hostname of your Cloud Pak for Data cluster.
    • <spark_engine_id> : The Engine ID of the native Spark engine.
    • <token> : The bearer token. For more information about generating the token, see Generating a bearer token.
    • <instance_id> : The instance ID from the watsonx.data cluster instance URL. For example, 1609968977179454.
    • <COS_bucket_name> : The name of the Cloud Object Storage.
    • <cos_endpoint>: The public endpoint of the Cloud Object Storage bucket. For example, s3.direct.us-south.cloud-object-storage.appdomain.cloud.
    • <Cloud Object storage HMAC access key> : The access key for Cloud Object storage. For more information, see Create HMAC credentials using the CLI.
    • <Cloud Object storage HMAC secret key> : The secret key for Cloud Object storage. For more information, see Create HMAC credentials using the CLI.
  6. If your Spark application resides in ADLS, specify the parameter values and run the following curl command. The following example shows the command to submit read.py application.
    Example 1 :
    curl --request POST \
      --url https://<cpd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
      --header 'Authorization: Bearer <token>' \
      --header 'Content-Type: application/json' \
      --header 'LhInstanceId: <instance_id>' \
      --data '{
      "application_details": {
        "application": "abfss://<storage_account>@<storage_container>.dfs.core.windows.net/adls-read.py",
        "conf": {
            "spark.hadoop.fs.azure.account.auth.type.<storage_account>.dfs.core.windows.net", "OAuth",
            "spark.hadoop.fs.azure.account.oauth.provider.type.<storage_account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
            "spark.hadoop.fs.azure.account.oauth2.client.id.<storage_account>.dfs.core.windows.net", "<application_id>",
            "spark.hadoop.fs.azure.account.oauth2.client.secret.<storage_account>.dfs.core.windows.net","<secret>",
            "spark.hadoop.fs.azure.account.oauth2.client.endpoint.<storage_account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory_id>/oauth2/token",
            "spark.hadoop.fs.azure.createRemoteFileSystemDuringInitialization", "false",
            "spark.app.name": "reader-app",
        }
      }
    }'
    Parameter values:
    • <cpd_host_name>: The hostname of your Cloud Pak for Data cluster.
    • <spark_engine_id> : The Engine ID of the native Spark engine.
    • <token> : The bearer token. For more information about generating the token, see Generating a bearer token.
    • <instance_id> : The instance ID from the watsonx.data cluster instance URL. For example, 1609968977179454.
    • <storage_account> : The name of the azure storage account.
    • <storage_container> : The name of the Azure storage container.
    • <application_id> : The Application ID of the ServicePrincipal.
    • <secret>: The Client Secret of the ServicePrincipal. For more information, see Create a service principal.
    • <directory_id> : The Directory ID of the ServicePrincipal. For more information, see Create a service principal.

    Example 2 :

    Application code for Gen1 ADLS:

    from pyspark.sql import SparkSession
    import json
    import time
    
    spark = SparkSession.builder \
        .appName("ADLS_ICEBERG") \
        .config("spark.hadoop.wxd.apikey", "ZenApiKey Y3BhZG1pbjowT2VFV0o1WmtVYVhqR0dtOVFxSkJrZzRjeTlvREQzZ0RaU2JDZGFO") \
        .config("fs.wasb.impl", "org.apache.hadoop.fs.azure.IbmlhcasAzureFileSystem") \
        .config("fs.wasbs.impl", "org.apache.hadoop.fs.azure.IbmlhcasAzureFileSystem$Secure") \
        .config("fs.azure.secure.mode", True) \
        .config("fs.azure.local.sas.key.mode", True) \
        .config("fs.azure.saskey.usecontainersaskeyforallaccess", False) \
        .config("spark.sql.catalogImplementation", "hive") \
        .config("spark.sql.catalog.new_catalog_gen1", "org.apache.iceberg.spark.SparkCatalog") \
        .config("hive.metastore.uris", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
        .config("spark.sql.catalog.new_catalog_gen1.type" ,"hive") \
        .config("spark.sql.iceberg.vectorization.enabled" ,"false") \
        .config("spark.hadoop.hive.metastore.schema.verification", "false") \
        .config("spark.hadoop.hive.metastore.schema.verification.record.version", "false") \
        .config("spark.hadoop.datanucleus.schema.autoCreateTables", "false") \
        .config("spark.hive.metastore.use.SSL", "true") \
        .config("spark.hive.metastore.truststore.path", "file:///opt/ibm/jdk/lib/security/cacerts") \
        .config("spark.driver.extraJavaOptions", "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true") \
        .config("spark.executor.extraJavaOptions", "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true") \
        .config("spark.hive.metastore.truststore.password", "changeit") \
        .config("spark.hive.metastore.truststore.type", "JKS") \
        .config("hive.metastore.client.auth.mode", "PLAIN") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.iceberg.vectorization.enabled" ,"false") \
        .config("spark.hive.metastore.client.plain.username", "cpadmin") \
        .config("spark.hive.metastore.client.plain.password", "ueTAKG1GX2eo8HafqiAsA4CprTotR9YO") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("fs.azure.createRemoteFileSystemDuringInitialization", "true") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # time.sleep(300)
    # sc = spark.sparkContext
    # sc.setLogLevel("DEBUG")
    #Example Spark SQL query
    #Execute the SQL query
    spark.sql("CREATE database if not exists new_catalog_gen1.new_database_gen1 LOCATION 'wasbs://lhcasblob2@lhcastest2.blob.core.windows.net/new_database_gen1'")
    
    # Execute the SQL query
    spark.sql("CREATE TABLE IF NOT EXISTS new_catalog_gen1.new_database_gen1.adls_aa_sam4(col1 INT, col2 STRING)").show()
    
    spark.sql("show tables from new_catalog_gen1.new_database_gen1").show()
    
    spark.sql('describe schema new_catalog_gen1.new_database_gen1').show(25)
    
    spark.sql("insert into new_catalog_gen1.new_database_gen1.adls_aa_sam4 values (1,'Alan'),(2,'Ben'),(3,'Chen')").show()
    
    spark.sql("select * from new_catalog_gen1.new_database_gen1.adls_aa_sam4").show()
    
    # Stop SparkSession
    spark.stop()

    Example 3 :

    Application code for Gen2 ADLS:

    from pyspark.sql import SparkSession
    import json
    import time
    
    spark = SparkSession.builder \
        .appName("ADLS_ICEBERG") \
        .config("spark.hadoop.wxd.apikey", "ZenApiKey Y3BhZG1pbjowT2VFV0o1WmtVYVhqR0dtOVFxSkJrZzRjeTlvREQzZ0RaU2JDZGFO") \
        .config("fs.azure.account.auth.type.annfeng.dfs.core.windows.net", "SAS") \
        .config("fs.azure.sas.token.provider.type.annfeng.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.IbmlhcasSASTokenProvider") \
        .config("spark.sql.catalogImplementation", "hive") \
        .config("spark.sql.catalog.new_catalog", "org.apache.iceberg.spark.SparkCatalog") \
        .config("hive.metastore.uris", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
        .config("spark.sql.catalog.new_catalog.type" ,"hive") \
        .config("spark.sql.iceberg.vectorization.enabled" ,"false") \
        .config("spark.hadoop.hive.metastore.schema.verification", "false") \
        .config("spark.hadoop.hive.metastore.schema.verification.record.version", "false") \
        .config("spark.hadoop.datanucleus.schema.autoCreateTables", "false") \
        .config("spark.hive.metastore.use.SSL", "true") \
        .config("spark.hive.metastore.truststore.path", "file:///opt/ibm/jdk/lib/security/cacerts") \
        .config("spark.driver.extraJavaOptions", "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true") \
        .config("spark.executor.extraJavaOptions", "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true") \
        .config("spark.hive.metastore.truststore.password", "changeit") \
        .config("spark.hive.metastore.truststore.type", "JKS") \
        .config("hive.metastore.client.auth.mode", "PLAIN") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.iceberg.vectorization.enabled" ,"false") \
        .config("spark.hive.metastore.client.plain.username", "cpadmin") \
        .config("spark.hive.metastore.client.plain.password", "ueTAKG1GX2eo8HafqiAsA4CprTotR9YO") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("fs.azure.createRemoteFileSystemDuringInitialization", "true") \
        .enableHiveSupport() \
        .getOrCreate()
    
    time.sleep(300)
    # sc = spark.sparkContext
    # sc.setLogLevel("DEBUG")
    #Example Spark SQL query
    #Execute the SQL query
    spark.sql("CREATE database if not exists new_catalog.new_database LOCATION 'abfss://castest@annfeng.dfs.core.windows.net/aarush_cas_test/new_database'")
    
    # Execute the SQL query
    spark.sql("CREATE TABLE IF NOT EXISTS new_catalog.new_database.adls_aa_sam4(col1 INT, col2 STRING)").show()
    
    spark.sql("show tables from new_catalog.new_database").show()
    
    spark.sql('describe schema new_catalog.new_database').show(25)
    
    spark.sql("insert into new_catalog.new_database.adls_aa_sam4 values (1,'Alan'),(2,'Ben'),(3,'Chen')").show()
    
    spark.sql("select * from new_catalog.new_database.adls_aa_sam4").show()
    
    # Stop SparkSession
    spark.stop()
    
  7. If your Spark application resides in Google Cloud Storage, specify the parameter values and run the following curl command. The following example shows the command to submit gcs-read.py application.
    Example :
    curl --request POST   --url https://<region>.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications   --header 'Authorization: Bearer <token>'   --header 'Content-Type: application/json'   --header 'AuthInstanceID: <crn_instance>'   --data '{
      "application_details": {
        "application": "gs://{bucket_name}//gcs-read.py",
        "conf": {
            "spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile","<json_keyfile>")
            "spark.app.name":"GCSFilesRead"
        }
      }
    }
    
    Parameter values:
    • <region>: The region where the Spark instance is provisioned..
    • <spark_engine_id> : The Engine ID of the native Spark engine.
    • <token> : The bearer token. For more information about generating the token, see Generating a bearer token.
    • <crn_instance> : The instance ID from the watsonx.data cluster instance URL. For example, 1609968977179454.
    • <json_keyfile> : The path to the json keyfile generated.
  8. After you submit the Spark application, you receive a confirmation message with the application ID and Spark version. Save it for reference.
  9. Log in to the watsonx.data cluster, access the Engine details page. In the Applications tab, use the application ID to list the application and you can track the stages. For more information, see View and manage applications.