Accessing a remote Hadoop cluster

This topic describes how to access a remote Hadoop cluster in an Analytics Engine powered by Apache Spark instance, both secure (kerberized) and insecure (non-kerberized), to run Spark jobs. Currently, access to HDP 2.6.5 and HDP 3.1 based Hadoop clusters is supported.

To complete this task, a project administrator must have provisioned an instance of Analytics Engine powered by Apache Spark and assigned you Developer role to the instance.

Before you begin

Prerequisites:

Submitting Spark jobs which access components on a remote Hadoop cluster

The steps to submit Spark jobs that access Hadoop components on a Hadoop cluster vary depending on whether the Hadoop cluster is secure or insecure. The main steps include:

Running a Spark job on a secure Hadoop cluster

To run a Spark job on a secure Hadoop cluster:

  1. Generate a delegation token:

    1. Download the delegation token generation utility based on the version of your Hadoop cluster:

    2. Extract the ZIP file:
       unzip HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.zip 
       Archive:  HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.zip
       inflating: HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar  
       inflating: delegation-token-generator.sh  
      

      The ZIP file contains the following two files HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar and delegation-token-generator.sh. Make sure you are on the edge node of the Hadoop cluster which has access to basic Hadoop and Hive JAR files. Also make sure that basic configuration files like core-site.xml, hdfs-site.xml and hive-site.xml are available on this edge node to run this utility.

    3. Add the JAR file that you extracted to the classpath.

      • For a 2.6.5 HDP cluster:
         export classpath="/usr/hdp/2.6.5.0-292/hive/lib/*:/usr/hdp/2.6.5.0-292/hadoop/*:/usr/hdp/2.6.5.0-292/hadoop-hdfs/*:/usr/hdp/2.6.5.0-292/hadoop-hdfs/lib/*:/usr/hdp/2.6.5.0-292/hadoop-mapreduce/*:/etc/hadoop/conf/:/extractionlocation/HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar"
        
      • For a 3.1 HDP cluster:
         export classpath="/usr/hdp/3.1.4.0-315/hive/lib/*:/usr/hdp/3.1.4.0-315/hadoop/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/lib/*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/*:/etc/hadoop/conf:/extractionlocation/HadoopDelegationTokenGenerator-0.0.1-SNAPSHOT.jar" 
        
    4. If you are using Hive and generating a HMS token, export the configurations for Hive. Example:
       export hive_kerberos_principal = "hive/abc.xyz.com@EXAMPLE.COM"; 
       export hive_metastore_uri="thrift://<thrift-server>:9083";
      
    5. Generate the Kerberos Ticket Granting Ticket (TGT). Let’s use the user ambari-qa on a HDP-265 cluster as an example:
       kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa-hadoop265@EXAMPLE.COM
      
    6. Execute the shell script from the downloaded ZIP file:
       sh delegation-token-generator.sh ambari-qa-hadoop265@EXAMPLE.COM /ambariqatok.dt HDFS HMS
      

    The token is fetched. The last two parameters in the command are the components for which the token needs to be generated. If you only need HDFS, omit HMS or vice versa. The script prints the delegation token in base64 encoded format as a string. Note down this string for later use.

  2. Prepare the job payload by adding the following Hadoop specific configurations:

    1. If the remote Hadoop cluster is kerberized, set the following parameter:
       "ae.spark.remoteHadoop.isSecure" : "true"
      
    2. If your Spark job accesses services, list those services:
       "ae.spark.remoteHadoop.services": "HDFS,HMS"
      

      The example shows accessing HDFS and HMS.

    3. To enable access to the kerberized Hadoop cluster from Spark, add the delegation token you noted down:
       "ae.spark.remoteHadoop.delegationToken": "SERUUwACEDkuMXYZ"
      
    4. If you are accessing HMS from Spark, add the Hive Metastore Kerberos principal and the URI to access the Hive Metastore:
       "spark.hadoop.hive.metastore.kerberos.principal" : "hive/HOSTABC.XYZ.COM@EXAMPLE.COM"
       "spark.hadoop.hive.metastore.uris":"thrift://<thrift-server>:9083"
      

      Here is a sample payload for an application called remoteHadoopAccessSample.py:

      {
      "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/remoteHadoopAccessSample.py",
      "application_arguments": ["hdfs://<namenode-server>:<namenode-rpc-port>/", <subpath-to-access>"],
      "engine": {
          "type": "spark",
          "conf": {
              "spark.app.name": "RemoteHadoopAccessSample",
              "ae.spark.remoteHadoop.isSecure": "true",
              "ae.spark.remoteHadoop.services": "HDFS,HMS",
              "ae.spark.remoteHadoop.delegationToken": "<base64-encoded-delegation-token>",
              "spark.hadoop.hive.metastore.kerberos.principal": "<hms-kerberos-principal>",
              "spark.hadoop.hive.metastore.uris": "thrift://<hms-server>:<hms-port>",
              "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
              "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
              "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
          }                 
        }
      }
      
  3. Submit your application as a Spark job. See Submitting Spark jobs via API.

    Here is an example of a Spark application (remoteHadoopAccessSample.py in the previous sample payload) that shows you how to access HDFS and HMS:

     import sys
     from pyspark.sql import SparkSession
     if __name__ == "__main__":
     spark = SparkSession.builder.appName("secureHadoop").enableHiveSupport().getOrCreate()
     path = "{}/{} ".format(sys.argv[1], sys.argv[2])
     print("Path accessed in HDFS is : {}".format(path));
     df = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load(path);
     df.show()
     sqlDF1 = spark.sql("show tables")
     sqlDF1.show()
     tablename = "securehadoop"
     createsql = "create external table {} (name string,id string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '{}/{}'".format(tablename, sys.argv[1], sys.argv[2])
     print("SQL executed for HMS : {}".format(createsql));
     sqlDFCr = spark.sql(createsql)
    
     insertsql = "insert into {} values('newvalue','123456')".format(tablename)
     print("SQL executed for insert in HMS :{}".format(insertsql));
     sqlDF2 = spark.sql(insertsql)
     sqlDF = spark.sql("SELECT * FROM {}".format(tablename))
     sqlDF.show()
     spark.stop()