Working with Apache Hudi catalog

The topic describes the procedure to run a Spark application that ingests data into an Apache Hudi catalog.

watsonx.data on IBM Software Hub

Procedure

  1. Create a storage with Apache Hudi catalog to store data used in the Spark application. To create storage with Apache Hudi catalog, see Adding a storage-catalog pair.
  2. Associate the storage with the external Spark engine. For more information, see Associating a catalog with an engine.
  3. Create Cloud Object Storage (COS) to store the Spark application. To create Cloud Object Storage and a bucket, see Creating a storage bucket. You can also use other storage types such as IBM Storage Ceph, Amazon S3, or Min IO.
  4. Register the Cloud Object Storage in watsonx.data. For more information, see Adding a storage-catalog pair.
  5. Save the following Spark application (Python file) to your local machine. Here, hudi_demo.py.
    The Python Spark application demonstrates the following functionality:
    • It creates a database inside the Apache Hudi catalog (that you created to store data). Here, hudi_db.
    • It creates a table inside the hudi_db database, namely hudi_table.
    • It inserts data into the hudi_table and does SELECT query operation.
    • It drops the table and schema after use.
    from pyspark.sql import SparkSession
    
    def init_spark():
        spark = SparkSession.builder \
            .appName("CreateHudiTableInCOS") \
            .enableHiveSupport() \
            .getOrCreate()
        return spark
    
    def main():
    
        try:
            spark = init_spark()
            spark.sql("show databases").show()
            spark.sql("create database if not exists spark_catalog.hudi_db LOCATION 's3a://hudi-connector-test/'").show()
            spark.sql("create table if not exists spark_catalog.hudi_db.hudi_table (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show()
            spark.sql("insert into hudi_db.hudi_table VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
            spark.sql("select * from spark_catalog.hudi_db.hudi_table").show()
            spark.sql("drop table spark_catalog.hudi_db.hudi_table").show()
            spark.sql("drop schema spark_catalog.hudi_db CASCADE").show()
    
        finally:
            spark.stop()
    
    if __name__ == '__main__':
        main()
  6. Upload the Spark application to the COS, see Uploading data.
  7. To submit the Spark application with data residing in Cloud Object Storage, specify the parameter values and run the following curl command.
    curl -k -X POST <V4_JOBS_API_ENDPOINT> -H "Authorization: ZenApiKey $<TOKEN>" -d '{ '{
    	"application_details": {
    		"conf": {
                "spark.serializer" : "org.apache.spark.serializer.KryoSerializer",
                "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint" : "<bucket_endpoint>",
                "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<access_key>",
                "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<secret_key>",
                "spark.hadoop.fs.s3a.path.style.access" : "true",
                "spark.hadoop.fs.s3a.impl" : "org.apache.hadoop.fs.s3a.S3AFileSystem",
                "spark.hive.metastore.uris" : "<thrift_url_catalog>",
                "spark.hive.metastore.use.SSL" : "true",
                "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password" : "changeit",
                "spark.hive.metastore.truststore.type" : "JKS",
                "spark.hive.metastore.client.auth.mode" : "PLAIN",
                "spark.hive.metastore.client.plain.username" : "<wxd_log_in_username>",
                "spark.hive.metastore.client.plain.password" : "<wxd_log_in_password>",
                "spark.driver.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true",
                "spark.executor.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true",
                "spark.hadoop.hive.metastore.schema.verification" : "false",
                "spark.hadoop.hive.metastore.schema.verification.record.version" : "false",
            "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
               "spark.kryo.registrator": "org.apache.spark.HoodieSparkKryoRegistrar",
               "spark.sql.catalog.spark_catalog.type": "hudi",
                "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
    
    		},
    		"application": "s3a://<data_storage_name>/hudi_final_cas.py"
    	}
    }
    Parameter values:
    • <V4_JOBS_API_ENDPOINT>: The endpoint for the instance that you want to use to submit your Spark job. To get the Spark jobs endpoint for your provisioned instance, see Administering the service instance.
    • <TOKEN>: To get the access token for your service instance, see Generating an API authorization token.
    • <data_storage_name>: The name of the storage bucket that stores data.
    • <bucket_endpoint> : Provide the Metastore host value of the storage.
    • <access_key> : Provide the access_key_id. Provide the access_key_id of the storage.
    • <secret_key> : Provide the secret_access_key. Provide the secret_access_key of the storage.
    • <wxd-user-name>: Your user name credential for the watsonx.data cluster. Note: You must have `Metastore Admin` access on Metadata Service. For more information, see Managing access to the Metadata Service (MDS).
    • <wxd-user-password>: Your password for the watsonx.data cluster.