Submitting Spark jobs to access components in a remote Hadoop cluster

This topic describes how to a run Spark job that access data available in a remote Hadoop cluster from IBM® watsonx.data.

You can access the data in Hadoop cluster, which is either secure (kerberized) and insecure (non-rubberized). watsonx.data now support Cloudera Distribution for Hadoop (CDH) based Hadoop clusters.

Before you begin

  • Install IBM watsonx.data instance.
  • Provision native Spark engine. For more information, see Provisioning native Spark engine.
  • The Project Administrator must assign Developer role to the watsonx.data instance.
  • To access an insecure remote Hadoop cluster:
    • Ensure that both Hadoop cluster and watsonx.data belongs to same subnet.
    • Set the following Hadoop configuration to allow communication on the Hadoop cluster:
      hosts=<WXD_PRIVATE_CLOUD_SUBNET>
      Note: The above configuration ensures that the Spark can access the Hadoop components through the RPC port. Specifically, HDFS Namenode, HDFS Datanode and Hive Metastore (HMS) must be accessible.
  • To access an secure remote Hadoop cluster:
    • Ensure that both Hadoop cluster and watsonx.data belongs to same subnet.
    • Set the following Hadoop configuration to allow communication on the Hadoop cluster:
      hadoop.rpc.protection=privacy 
      hadoop.proxyuser.hive.users=<USER_ACCESSING_FROM_SPARK> 
      hadoop.proxyuser.hive.hosts=<WXD_PRIVATE_CLOUD_SUBNET> 
      hadoop.proxyuser.hive.groups=<REQUIRED_GROUPS_USED_IN_JOBS> 
      Note: The above configuration ensures that the Spark can access the Hadoop components through the RPC port. Specifically, HDFS Namenode, HDFS Datanode and Hive Metastore (HMS) must be accessible.
    • Ensure that the keytab file of the user submitting the Spark job on the edge node of the remote Hadoop cluster is readily available. The keytab file is required for secure mode of HDFS access.

About this task

The steps to submit Spark jobs that access Hadoop components on a Hadoop cluster vary depending on whether the Hadoop cluster is secure or insecure. The main steps include:
  • Running a Spark job on an insecure Hadoop cluster
  • Running a Spark job on a secure Hadoop cluster
Running a Spark job on an insecure Hadoop cluster
Specify the parameter values and run the following curl command. The following example shows the command to submit remoteHadoopAccessSample.py
curl --request POST \
  --url https://<wxd-host-name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
  --header 'Authorization: Bearer <token>' \
  --header 'Content-Type: application/json' \
  --header 'LhInstanceId: <instance-id>' \
  --data '{ 
  "application_details": 
        { "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/remoteHadoopAccessSample.py", 
          "application_arguments": ["hdfs://<namenode-server>:<namenode-rpc-port>/subpath-to-access>"], 
          "conf": 
            { "spark.app.name": "RemoteHadoopAccessSample", 
              "ae.spark.remoteHadoop.isSecure": "false", 
              "ae.spark.remoteHadoop.services": "HDFS,HMS", 
              "spark.hadoop.hive.metastore.uris": "thrift://<hms-server>:<hms-port>", 
              "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>", 
              "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
              "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>" 
            } 
        }
}
Parameter values:
  • <wxd_host_name>: The hostname of your watsonx.data or Cloud Pak for Data cluster.
  • <instance_id> : The instance ID from the watsonx.data cluster instance URL. For example, 1609968977179454.
  • <spark_engine_id> : The Engine ID of the native Spark engine.
  • <token> : The bearer token. For more information about generating the token, see Generating a bearer token.
  • <bucket-name>: The name of the Object storage bucket that contains the data, which the application uses.
    Note: You must register this bucket with watsonx.data, associate a catalog and you must also have access to both the bucket and catalog.
  • <COS_SERVICE_NAME>:
  • <hms-server>:<hms-port>: The thrift URL of the hive metastore.
  • <cos-endpoint>: The hostname of the endpoint for accessing the your bucket containing the application script.
  • <COS_ACCESS_KEY>: Your user name credential for the watsonx.data cluster. Note: You must have `Metastore Admin` access on Hive Metastore.
  • <COS_SECRET_KEY>: Your password for the watsonx.data cluster.
Running a Spark job on a secure Hadoop cluster
Generate a delegation token on an edge node of the Hadoop cluster. Add this token to the job's payload with the other Hadoop required configurations. Submitting the Spark job. The main steps include:
  1. Generate a delegation token. Download the delegation token generation utility based on the version of your Hadoop cluster. For CDH Cluster (Hive standalone 2.1 and later), use Hadoop delegation token generator.
  2. Extract the ZIP file by using the following shell command.
    unzip HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.zip 
    Archive: HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.zip
    inflating: HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar 
    inflating: delegation-token-generator.sh 
  3. Add the JAR file to the classpath. For example, CDH 7.1.7 cluster:
    export classpath="/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop/*:/opt/cloudera/parcels/CDH
    -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hive/lib/*:/opt/cloudera/parcels/CDH
    -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop-hdfs/*:/opt/cloudera/parcels/CDH
    -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH
    -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop-mapreduce/*:/etc/hadoop/conf:/etc/hive/conf:/opt/cloudera/parcels/CDH
    -7.1.7-1.cdh7.1.7.p2000.37147774/lib/spark/*:/opt/cloudera/parcels/CDH
    -7.1.7-1.cdh7.1.7.p2000.37147774/lib/search/lib/*:/root/pratham/HadoopDelegationTokenGenerator
    -0/HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar"
  4. If you are using both Hive and HMS token, export the configurations for Hive. Example:
    export hive_kerberos_principal="hive/abc.xyz.com@EXAMPLE.COM"; 
    export hive_metastore_uri="thrift://<thrift-server>:9083";
  5. Export your HADOOP_HOME. Example:
    export HADOOP_HOME="/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop/"
  6. Generate the Kerberos Ticket Granting Ticket (TGT). Let's use the user `hdfs` on a CDH cluster as an example:
    kinit -kt /<path to hdfs keytab>/hdfs.keytab hdfs/abc.xyz.com@EXAMPLE.COM 
  7. Execute the shell script from the downloaded ZIP file:
    sh delegation-token-generator.sh hdfs/abc.xyz.com@EXAMPLE.COM /mytok.dt HDFS HMS

    The token is fetched. The last two parameters in the command are the components for which the token needs to be generated. If you only need HDFS, omit HMS or vice versa. The script prints the delegation token in base64 encoded format as a string. Note down this string for later use.

  8. Prepare the job payload by adding the following Hadoop specific configurations. If the remote Hadoop cluster is kerberized, set the following parameter:
    "ae.spark.remoteHadoop.isSecure" : "true"
    
    If your Spark job accesses services, list those services:
    
    "ae.spark.remoteHadoop.services": "HDFS,HMS"
    
    The above example shows accessing HDFS and HMS.
  9. Export all the hadoop configurations files on the remote Cluster in this case CDH and mount all those conf files to your spark application. Example of configuration files needed. See Creating a Storage Volumes
    core-site.xml 
    hadoop-env.sh 
    hdfs-site.xml 
    log4j.properties 
    mapred-site.xml 
    ssl-client.xml 
    topology.map 
  10. Add the mount location to extraClasspath of both driver and executor and set the HADOOP_CONF_DIR to this path. Example:
    "spark.driver.extraClassPath":"/mnts/remote-hadoop-test/"
    "spark.executor.extraClassPath":"/mnts/remote-hadoop-test/"
    
    "HADOOP_CONF_DIR": "/mnts/remote-hadoop-test/"
  11. To enable access to the kerberized Hadoop cluster from Spark, add the delegation token you noted down:
    "ae.spark.remoteHadoop.delegationToken": "<token>"
  12. If you are accessing HMS from Spark, add the Hive Metastore Kerberos principal and the URI to access the Hive Metastore:
    "spark.hadoop.hive.metastore.kerberos.principal" : "hive/abc.xyz.com@EXAMPLE.COM"
    "spark.hadoop.hive.metastore.uris":"thrift://<thrift-server>:<thrift-port>"
  13. Here is a sample payload for an application called `remoteHadoopAccessSample.py`:
    { 
      "application_details": 
      { "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/remoteHadoopAccessSample.py", 
        "application_arguments": ["hdfs://<namenode-server>:<namenode-rpc-port>/<subpath-to-access>"], 
        "conf": 
        { "spark.app.name": "RemoteHadoopAccessSample", 
          "ae.spark.remoteHadoop.isSecure": "true", 
          "ae.spark.remoteHadoop.services": "HDFS,HMS", 
          "ae.spark.remoteHadoop.delegationToken": "<base64-encoded-delegation-token>", 
          "spark.hadoop.hive.metastore.kerberos.principal": "<hms-kerberos-principal>", 
          "spark.hadoop.hive.metastore.uris": "thrift://<hms-server>:<thrift-port>", 
          "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>", 
          "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>", 
          "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>",
          "spark.hadoop.hadoop.security.authentication": "kerberos",
          "spark.driver.extraClassPath":"/mnts/<vol-mount-path>/",
    	  "spark.executor.extraClassPath":"/mnts/<vol-mount-path>/"
        },
        "env": {
    	    "HADOOP_CONF_DIR": "/mnts/<vol-mount-path>/ "
    		}           
            },
      "volumes": [{
        "name": "<volume-name>",
        "mount_path": "/mnts/<vol-mount-path>"
      }]           
    } 
    
  14. Here is an example of a Spark application (remoteHadoopAccessSample.py in the previous sample payload) that shows you how to access HDFS and HMS:
    import sys
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
    spark = SparkSession.builder.appName("secureHadoop").enableHiveSupport().getOrCreate()
    path = "{}/{} ".format(sys.argv[1], sys.argv[2])
    print("Path accessed in HDFS is : {}".format(path));
    df = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load(path);
    df.show()
    sqlDF1 = spark.sql("show tables")
    sqlDF1.show()
    tablename = "securehadoop"
    createsql = "create external table {} (name string,id string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '{}/{}'".format(tablename, sys.argv[1], sys.argv[2])
    print("SQL executed for HMS : {}".format(createsql));
    sqlDFCr = spark.sql(createsql)
    
    insertsql = "insert into {} values('newvalue','123456')".format(tablename)
    print("SQL executed for insert in HMS :{}".format(insertsql));
    sqlDF2 = spark.sql(insertsql)
    sqlDF = spark.sql("SELECT * FROM {}".format(tablename))
    sqlDF.show()
    spark.stop()
  15. Submit your application as a Spark job.