Submitting Spark jobs to access components in a remote Hadoop cluster
This topic describes how to a run Spark job that access data available in a remote Hadoop cluster from IBM® watsonx.data.
You can access the data in Hadoop cluster, which is either secure (kerberized) and insecure (non-rubberized). watsonx.data now support Cloudera Distribution for Hadoop (CDH) based Hadoop clusters.
Before you begin
- Install IBM watsonx.data instance.
- Provision native Spark engine. For more information, see Provisioning native Spark engine.
- The Project Administrator must assign Developer role to the watsonx.data instance.
- To access an insecure remote Hadoop cluster:
- Ensure that both Hadoop cluster and watsonx.data belongs to same subnet.
- Set the following Hadoop configuration to allow communication on the Hadoop
cluster:
hosts=<WXD_PRIVATE_CLOUD_SUBNET>
Note: The above configuration ensures that the Spark can access the Hadoop components through the RPC port. Specifically, HDFS Namenode, HDFS Datanode and Hive Metastore (HMS) must be accessible.
- To access an secure remote Hadoop cluster:
- Ensure that both Hadoop cluster and watsonx.data belongs to same subnet.
- Set the following Hadoop configuration to allow communication on the Hadoop
cluster:
hadoop.rpc.protection=privacy hadoop.proxyuser.hive.users=<USER_ACCESSING_FROM_SPARK> hadoop.proxyuser.hive.hosts=<WXD_PRIVATE_CLOUD_SUBNET> hadoop.proxyuser.hive.groups=<REQUIRED_GROUPS_USED_IN_JOBS>
Note: The above configuration ensures that the Spark can access the Hadoop components through the RPC port. Specifically, HDFS Namenode, HDFS Datanode and Hive Metastore (HMS) must be accessible. - Ensure that the keytab file of the user submitting the Spark job on the edge node of the remote Hadoop cluster is readily available. The keytab file is required for secure mode of HDFS access.
About this task
- Running a Spark job on an insecure Hadoop cluster
- Running a Spark job on a secure Hadoop cluster
- Running a Spark job on an insecure Hadoop cluster
- Specify the parameter values and run the following curl command. The following example shows the
command to submit
remoteHadoopAccessSample.py
Parameter values:curl --request POST \ --url https://<wxd-host-name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \ --header 'Authorization: Bearer <token>' \ --header 'Content-Type: application/json' \ --header 'LhInstanceId: <instance-id>' \ --data '{ "application_details": { "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/remoteHadoopAccessSample.py", "application_arguments": ["hdfs://<namenode-server>:<namenode-rpc-port>/subpath-to-access>"], "conf": { "spark.app.name": "RemoteHadoopAccessSample", "ae.spark.remoteHadoop.isSecure": "false", "ae.spark.remoteHadoop.services": "HDFS,HMS", "spark.hadoop.hive.metastore.uris": "thrift://<hms-server>:<hms-port>", "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>", "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>", "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>" } } }
- <wxd_host_name>: The hostname of your watsonx.data or Cloud Pak for Data cluster.
- <instance_id> : The instance ID from the watsonx.data cluster instance
URL. For example,
1609968977179454
. - <spark_engine_id> : The Engine ID of the native Spark engine.
- <token> : The bearer token. For more information about generating the token, see Generating a bearer token.
- <bucket-name>: The name of the Object storage bucket that contains the
data, which the application uses.Note: You must register this bucket with watsonx.data, associate a catalog and you must also have access to both the bucket and catalog.
- <COS_SERVICE_NAME>:
- <hms-server>:<hms-port>: The thrift URL of the hive metastore.
- <cos-endpoint>: The hostname of the endpoint for accessing the your bucket containing the application script.
- <COS_ACCESS_KEY>: Your user name credential for the watsonx.data cluster. Note: You must have `Metastore Admin` access on Hive Metastore.
- <COS_SECRET_KEY>: Your password for the watsonx.data cluster.
- Running a Spark job on a secure Hadoop cluster
- Generate a delegation token on an edge node of the Hadoop cluster. Add this token to the job's
payload with the other Hadoop required configurations. Submitting the Spark job. The main steps
include:
- Generate a delegation token. Download the delegation token generation utility based on the version of your Hadoop cluster. For CDH Cluster (Hive standalone 2.1 and later), use Hadoop delegation token generator.
- Extract the ZIP file by using the following shell
command.
unzip HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.zip Archive: HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.zip inflating: HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar inflating: delegation-token-generator.sh
- Add the JAR file to the classpath. For example, CDH 7.1.7
cluster:
export classpath="/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop/*:/opt/cloudera/parcels/CDH -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hive/lib/*:/opt/cloudera/parcels/CDH -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop-hdfs/*:/opt/cloudera/parcels/CDH -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH -7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop-mapreduce/*:/etc/hadoop/conf:/etc/hive/conf:/opt/cloudera/parcels/CDH -7.1.7-1.cdh7.1.7.p2000.37147774/lib/spark/*:/opt/cloudera/parcels/CDH -7.1.7-1.cdh7.1.7.p2000.37147774/lib/search/lib/*:/root/pratham/HadoopDelegationTokenGenerator -0/HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar"
- If you are using both Hive and HMS token, export the configurations for Hive.
Example:
export hive_kerberos_principal="hive/abc.xyz.com@EXAMPLE.COM"; export hive_metastore_uri="thrift://<thrift-server>:9083";
- Export your HADOOP_HOME.
Example:
export HADOOP_HOME="/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p2000.37147774/lib/hadoop/"
- Generate the Kerberos Ticket Granting Ticket (TGT). Let's use the user `hdfs` on a CDH cluster
as an
example:
kinit -kt /<path to hdfs keytab>/hdfs.keytab hdfs/abc.xyz.com@EXAMPLE.COM
- Execute the shell script from the downloaded ZIP file:
sh delegation-token-generator.sh hdfs/abc.xyz.com@EXAMPLE.COM /mytok.dt HDFS HMS
The token is fetched. The last two parameters in the command are the components for which the token needs to be generated. If you only need HDFS, omit HMS or vice versa. The script prints the delegation token in base64 encoded format as a string. Note down this string for later use.
- Prepare the job payload by adding the following Hadoop specific configurations. If the remote
Hadoop cluster is kerberized, set the following
parameter:
"ae.spark.remoteHadoop.isSecure" : "true" If your Spark job accesses services, list those services: "ae.spark.remoteHadoop.services": "HDFS,HMS" The above example shows accessing HDFS and HMS.
- Export all the hadoop configurations files on the remote Cluster in this case CDH and mount all
those conf files to your spark application. Example of configuration files needed. See Creating a Storage Volumes
core-site.xml hadoop-env.sh hdfs-site.xml log4j.properties mapred-site.xml ssl-client.xml topology.map
- Add the mount location to extraClasspath of both driver and executor and set the HADOOP_CONF_DIR
to this path. Example:
"spark.driver.extraClassPath":"/mnts/remote-hadoop-test/" "spark.executor.extraClassPath":"/mnts/remote-hadoop-test/" "HADOOP_CONF_DIR": "/mnts/remote-hadoop-test/"
- To enable access to the kerberized Hadoop cluster from Spark, add the delegation token you noted
down:
"ae.spark.remoteHadoop.delegationToken": "<token>"
- If you are accessing HMS from Spark, add the Hive Metastore Kerberos principal and the URI to
access the Hive Metastore:
"spark.hadoop.hive.metastore.kerberos.principal" : "hive/abc.xyz.com@EXAMPLE.COM" "spark.hadoop.hive.metastore.uris":"thrift://<thrift-server>:<thrift-port>"
- Here is a sample payload for an application called
`remoteHadoopAccessSample.py`:
{ "application_details": { "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/remoteHadoopAccessSample.py", "application_arguments": ["hdfs://<namenode-server>:<namenode-rpc-port>/<subpath-to-access>"], "conf": { "spark.app.name": "RemoteHadoopAccessSample", "ae.spark.remoteHadoop.isSecure": "true", "ae.spark.remoteHadoop.services": "HDFS,HMS", "ae.spark.remoteHadoop.delegationToken": "<base64-encoded-delegation-token>", "spark.hadoop.hive.metastore.kerberos.principal": "<hms-kerberos-principal>", "spark.hadoop.hive.metastore.uris": "thrift://<hms-server>:<thrift-port>", "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>", "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>", "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>", "spark.hadoop.hadoop.security.authentication": "kerberos", "spark.driver.extraClassPath":"/mnts/<vol-mount-path>/", "spark.executor.extraClassPath":"/mnts/<vol-mount-path>/" }, "env": { "HADOOP_CONF_DIR": "/mnts/<vol-mount-path>/ " } }, "volumes": [{ "name": "<volume-name>", "mount_path": "/mnts/<vol-mount-path>" }] }
- Here is an example of a Spark application (remoteHadoopAccessSample.py in the previous sample
payload) that shows you how to access HDFS and HMS:
import sys from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName("secureHadoop").enableHiveSupport().getOrCreate() path = "{}/{} ".format(sys.argv[1], sys.argv[2]) print("Path accessed in HDFS is : {}".format(path)); df = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load(path); df.show() sqlDF1 = spark.sql("show tables") sqlDF1.show() tablename = "securehadoop" createsql = "create external table {} (name string,id string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '{}/{}'".format(tablename, sys.argv[1], sys.argv[2]) print("SQL executed for HMS : {}".format(createsql)); sqlDFCr = spark.sql(createsql) insertsql = "insert into {} values('newvalue','123456')".format(tablename) print("SQL executed for insert in HMS :{}".format(insertsql)); sqlDF2 = spark.sql(insertsql) sqlDF = spark.sql("SELECT * FROM {}".format(tablename)) sqlDF.show() spark.stop()
- Submit your application as a Spark job.