Using IBM Cloud Data Engine as a metastore

You can use IBM Cloud Data Engine to store and manage the metadata for tables and views in a catalog that is compatible with a Hive metastore.

Each instance of IBM Cloud Data Engine includes a database catalog that you can use to register and manage table definitions for your data on the IBM Cloud Pak for Data cluster. Catalog syntax is compatible with Hive metastore syntax. You can use IBM Cloud Data Engine to externalize metadata outside the IBM Cloud Pak for Data cluster.

Prerequisites

Before using Data Engine with Analytics Engine, you must transfer associated libraries and packages into the Analytics Engine environment.

Prerequisites for accessing Data Engine from Spark environments

If you are accessing Data Engine from Spark environments through a notebook, you must run the following commands in a cell of your notebook.

spark.stop()
!mkdir /tmp/dataengine
!wget https://us.sql-query.cloud.ibm.com/download/catalog/dataengine_spark-1.3.0-py3-none-any.whl -O /tmp/dataengine/dataengine_spark-1.3.0-py3-none-any.whl
# user-libs/spark2 is in the classpath of Spark
!wget https://us.sql-query.cloud.ibm.com/download/catalog/dataengine-spark-integration-1.3.0.jar -O user-libs/spark2/dataengine-spark-integration-1.3.0.jar
!wget https://us.sql-query.cloud.ibm.com/download/catalog/hive-metastore-standalone-client-3.1.2-sqlquery.jar -O  /tmp/dataengine/hive-metastore-standalone-client-3.1.2-sqlquery.jar
!pip install --force-reinstall /tmp/dataengine/dataengine_spark-1.3.0-py3-none-any.whl --user
Note: You must restart the kernel before making the metastore related calls.

Prerequisites for accessing Data Engine from Analytics Engine instances

If you are accessing Data Engine from an Analytics Engine instance from an application, you must transfer the Data Engine libraries and packages to make them available to run in the environment:

  1. Download the following three files into a local system by running the following commands:

    wget https://us.sql-query.cloud.ibm.com/download/catalog/dataengine_spark-1.3.0-py3-none-any.whl
    wget https://us.sql-query.cloud.ibm.com/download/catalog/dataengine-spark-integration-1.3.0.jar
    wget https://us.sql-query.cloud.ibm.com/download/catalog/hive-metastore-standalone-client-3.1.2-sqlquery.jar
    
  2. The first package listed in the files is a "whl” python package. Install this package by following the steps in Customizing Spark applications and notebooks using the cc-home volume.

  3. The second and third packages in these files are JAR files and must be made available in the Spark class path when running the Spark application. To make the files available:

  4. Upload the JAR files to a service volume instance.

  5. Specify the file path in the parameters of your Spark application:

    spark.driver.extraClassPath=</path/to/file1/in/voume:/path/to/file2/in/voume>
    spark.executor.extraClassPath=</path/to/file1/in/voume:/path/to/file2/in/voume>
    

Steps to create a service volume and upload the files can be found in Managing persistent volume instances with the Volumes API.

Using IBM Cloud Data Engine as a metastore

To use IBM Cloud Data Engine as a metastore:

  1. Create an IBM Cloud Data Engine instance. See IBM Cloud Data Engine.

    After you have provisioned the IBM Cloud Data Engine instance:

    1. Make a note of the CRN of the instance.
    2. Create an account level API key or service ID level API key with access to the instance.
    3. This service ID should be granted access to both the IBM Cloud Data Engine instance as well as the bucket.

    You can then configure your IBM Analytics Engine powered by Apache Spark instance to use the default metastore configuration either at instance level or at application level as needed.

  2. Specify the IBM Cloud Data Engine metastore connection parameters. The following parameters are the additional IBM Cloud Data Engine metastore parameters that you should pass as part of the Spark application payload or specify as instance defaults if you want to access metastore data across all applications:

    "spark.hive.metastore.truststore.password" : "changeit",
    "spark.hive.execution.engine":"spark",
    "spark.hive.metastore.client.plain.password":"<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>",
    "spark.hive.metastore.uris":"CHANGEME-thrift://catalog.us.dataengine.cloud.ibm.com:9083-CHANGEME",
    "spark.hive.metastore.client.auth.mode":"PLAIN",
    "spark.hive.metastore.use.SSL":"true",
    "spark.hive.stats.autogather":"false",
    "spark.hive.metastore.client.plain.username":"CHANGEME-crn:v1:bluemix:public:sql-query:us-south:a/abcdefgh::CHANGEME",
    # for spark 3.4 with java 11
    "spark.hive.metastore.truststore.path":"/opt/ibm/jdk/lib/security/cacerts",
    # for spark 3.4 with java 8
    "spark.hive.metastore.truststore.path":"file:///opt/ibm/jdk/jre/lib/security/cacerts",
    "spark.sql.catalogImplementation":"hive",
    "spark.sql.hive.metastore.jars":"CHANGEME --> /tmp/dataengine/*  OR path in volume",
    "spark.sql.hive.metastore.version":"3.0",
    "spark.sql.warehouse.dir":"file:///tmp",
    "spark.sql.catalogImplementation":"hive",
    "spark.hadoop.metastore.catalog.default":"spark"
    

    For the variables:

  3. Generate and store data. Run the following regular PySpark application, called generate-and-store-data.py in this example, which stores Parquet data in some location on .

    from pyspark.sql import SparkSession
    
    def init_spark():
        spark = SparkSession.builder.appName("dataengine-generate-store-parquet-data").getOrCreate()
        sc = spark.sparkContext
        return spark,sc
    
    def generate_and_store_data(spark):
        data =[("India","New Delhi"),("France","Paris"),("Lithuania","Vilnius"),("Sweden","Stockholm"),("Switzerland","Bern")]
        columns=["Country","Capital"]
        df=spark.createDataFrame(data,columns)
        df.write.mode("overwrite").parquet("cos://mybucket.mycosservice/countriescapitals.parquet")
    
    def main():
        spark,sc = init_spark()
        generate_and_store_data(spark,sc)
    if __name__ == '__main__':
        main()
    
  4. Create the table schema definition. Note that you can't use standard Spark SQL syntax to create tables when using IBM Cloud Data Engine as a metastore. There are two ways that you can use to create a table:

    • From the IBM Cloud Data Engine user interface or by using the standard IBM Cloud Data Engine API (see Data Engine service REST V3 API) or Python SDK (see ibmcloudsql).

      CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) 
      USING PARQUET 
      LOCATION cos://us-geo/mybucket/countriescapitals.parquet
      
    • Programmatically from within your PySpark application by using the following code snippet for PySpark called create_table_data_engine.py:

      import requests
      import time
      def create_data_engine_table(api_key,crn):
          headers = {
          'Authorization': 'Basic Yng6Yng=',
          }
          data = {
          'apikey': api_key,
          'grant_type': 'urn:ibm:params:oauth:grant-type:apikey',
          }
          response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data)
          token = response.json()['token']
      
          headers_token = {
          'Accept': 'application/json',
          'Authorization: Bearer ${TOKEN}",
          }
          params = {
          'instance_crn': crn,
          }
          json_data = {
          'statement': 'CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) USING PARQUET LOCATION cos://us-geo/mybucket/countriescapitals.parquet',
          }
          response = requests.post('https://api.dataengine.cloud.ibm.com/v3/sql_jobs', params=params, headers=headers_token, json=json_data)
          job_id = response.json()['job_id']
          time.sleep(10)
          response = requests.get(f'https://api.dataengine.cloud.ibm.com/v3/sql_jobs/{job_id}', params=params, headers=headers_token)
          if(response.json()['status']=='completed'):
              print(response.json())
      

      Note that for the location URI (cos://us-geo/mybucket/countriescapitals.parquet) you need to pass one of the standard aliases. See Cloud Object Storage endpoints.

      The payload for the above application create_table_data_engine_payload.json also needs to provide the credentials with the exact standard alais, in this case: "us-geo"

      {
          "application_details": {
              "conf": {
                  "spark.hadoop.fs.cos.us-geo.endpoint": "CHANGEME",
                  "spark.hadoop.fs.cos.us-geo.access.key": "CHANGEME",
                  "spark.hadoop.fs.cos.us-geo.secret.key": "CHANGEME"
              },
          "application": "cos://mybucket.us-geo/create_table_data_engine.py",
          "arguments": ["crn:v1:bluemix:public:sql-query:us-south:a/<CRN-DATA-ENGINE-INSTANCE>::","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"]
          }
      }
      
  5. Read the data from the table using the Spark SQL in the following application called select_query_data_engine.py:

    from pyspark.sql import SparkSession
    import time
    
    def init_spark():
      spark = SparkSession.builder.appName("dataengine-table-select-test").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    
    def select_query_data_engine(spark,sc):
      tablesDF=spark.sql("SHOW TABLES")
      tablesDF.show()
      statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
      statesDF.show()
    
    def main():
      spark,sc = init_spark()
      select_query_data_engine(spark,sc)
    
    if __name__ == '__main__':
      main()
    

    Note that for the SELECT command to work, you have to pass the identifiers as one of the standard aliases, in this example, you have to we have used us-geo. If you do not pass the expected ones, you might see the following error: Configuration parse exception: Access KEY is empty. Please provide valid access key.

    select_query_data_engine_payload.json:

    {
        "application_details": {
            "conf": {
                "spark.hadoop.fs.cos.us-geo.endpoint": "CHANGEME",
                "spark.hadoop.fs.cos.us-geo.access.key": "CHANGEME",
                "spark.hadoop.fs.cos.us-geo.secret.key": "CHANGEME",
                "spark.hive.metastore.truststore.password" : "changeit",
                "spark.hive.execution.engine":"spark",
                "spark.hive.metastore.client.plain.password":"APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE",
                "spark.hive.metastore.uris":"thrift://catalog.us.dataengine.cloud.ibm.com:9083",
                "spark.hive.metastore.client.auth.mode":"PLAIN",
                "spark.hive.metastore.use.SSL":"true",
                "spark.hive.stats.autogather":"false",
                "spark.hive.metastore.client.plain.username":"crn:v1:bluemix:public:sql-query:us-south:a/<CRN-DATA-ENGINE-INSTANCE>::",
                # for spark 3.4 with java 11
                "spark.hive.metastore.truststore.path":"/opt/ibm/jdk/lib/security/cacerts",
                # for spark 3.2 with java 8
                "spark.hive.metastore.truststore.path":"file:///opt/ibm/jdk/jre/lib/security/cacerts",
                "spark.sql.catalogImplementation":"hive",
                "spark.sql.hive.metastore.jars":"/opt/ibm/connectors/data-engine/hms-client/*",
                "spark.sql.hive.metastore.version":"3.0",
                "spark.sql.warehouse.dir":"file:///tmp",
                "spark.sql.catalogImplementation":"hive",
                "spark.hadoop.metastore.catalog.default":"spark"
            },
            "application": "cos://mybucket.us-geo/select_query_data_engine.py"
        }
    }
    

Convenience API

If you want to do a quick test of the Hive metastore by specifying connection API in your application, you can use the convenience API shown in the following PySpark example.

In this example, there is no need to pass any Hive metastore parameters to your application. The call to SparkSessionWithDataengine.enableDataengine will initialize the connections to without the additional Hive metastore parameters.

dataengine-job-convenience_api.py:

from dataengine import SparkSessionWithDataengine
from pyspark.sql import SQLContext
import sys
 from pyspark.sql import SparkSession
import time

def dataengine_table_test(spark,sc):
  tablesDF=spark.sql("SHOW TABLES")
  tablesDF.show()
  statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
  statesDF.show()

def main():
  if __name__ == '__main__':
    if len (sys.argv) < 2:
        exit(1)
    else:
        crn = sys.argv[1]
        apikey = sys.argv[2]
        session_builder = SparkSessionWithDataengine.enableDataengine(crn, apikey, "public", "/opt/ibm/connectors/data-engine/hms-client")
        spark = session_builder.appName("Spark DataEngine integration test").getOrCreate()
        sc = spark.sparkContext
        dataengine_table_test (spark,sc)

if __name__ == '__main__':
  main()

The following is the payload for the convenience dataengine-job-convenience_api.py:

{
    "application_details": {
        "conf": {
            "spark.hadoop.fs.cos.us-geo.endpoint": "CHANGEME",
            "spark.hadoop.fs.cos.us-geo.access.key": "CHANGEME",
            "spark.hadoop.fs.cos.us-geo.secret.key": "CHANGEME"
        },
        "application": "cos://mybucket.us-geo/dataengine-job-convenience_api.py",
        "arguments": ["crn:v1:bluemix:public:sql-query:us-south:a/<CRN-DATA-ENGINE-INSTANCE>::","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"]
    }
}