Working with Apache Hudi catalog
The topic describes the procedure to run a Spark application that ingests data into an Apache Hudi catalog.
watsonx.data on IBM Software Hub
Procedure
- Create a storage with Apache Hudi catalog to store data used in the Spark application. To create storage with Apache Hudi catalog, see Adding a storage-catalog pair.
- Associate the storage with the external Spark engine. For more information, see Associating a catalog with an engine.
- Create Cloud Object Storage (COS) to store the Spark application. To create Cloud Object Storage and a bucket, see Creating a storage bucket. You can also use other storage types such as IBM Storage Ceph, Amazon S3, or Min IO.
- Register the Cloud Object Storage in watsonx.data. For more information, see Adding a storage-catalog pair.
- Save the following Spark application (Python file) to your local machine. Here,
hudi_demo.py
.The Python Spark application demonstrates the following functionality:- It creates a database inside the Apache Hudi catalog (that you created to store data). Here,
hudi_db
. - It creates a table inside the
hudi_db
database, namelyhudi_table
. - It inserts data into the
hudi_table
and doesSELECT
query operation. - It drops the table and schema after use.
from pyspark.sql import SparkSession def init_spark(): spark = SparkSession.builder \ .appName("CreateHudiTableInCOS") \ .enableHiveSupport() \ .getOrCreate() return spark def main(): try: spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.hudi_db LOCATION 's3a://hudi-connector-test/'").show() spark.sql("create table if not exists spark_catalog.hudi_db.hudi_table (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show() spark.sql("insert into hudi_db.hudi_table VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.hudi_db.hudi_table").show() spark.sql("drop table spark_catalog.hudi_db.hudi_table").show() spark.sql("drop schema spark_catalog.hudi_db CASCADE").show() finally: spark.stop() if __name__ == '__main__': main()
- It creates a database inside the Apache Hudi catalog (that you created to store data). Here,
- Upload the Spark application to the COS, see Uploading data.
- To submit the Spark application with data residing in Cloud Object Storage, specify the
parameter values and run the following curl command.
curl -k -X POST <V4_JOBS_API_ENDPOINT> -H "Authorization: ZenApiKey $<TOKEN>" -d '{ '{ "application_details": { "conf": { "spark.serializer" : "org.apache.spark.serializer.KryoSerializer", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint" : "<bucket_endpoint>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<access_key>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<secret_key>", "spark.hadoop.fs.s3a.path.style.access" : "true", "spark.hadoop.fs.s3a.impl" : "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hive.metastore.uris" : "<thrift_url_catalog>", "spark.hive.metastore.use.SSL" : "true", "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts", "spark.hive.metastore.truststore.password" : "changeit", "spark.hive.metastore.truststore.type" : "JKS", "spark.hive.metastore.client.auth.mode" : "PLAIN", "spark.hive.metastore.client.plain.username" : "<wxd_log_in_username>", "spark.hive.metastore.client.plain.password" : "<wxd_log_in_password>", "spark.driver.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true", "spark.executor.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true", "spark.hadoop.hive.metastore.schema.verification" : "false", "spark.hadoop.hive.metastore.schema.verification.record.version" : "false", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "spark.kryo.registrator": "org.apache.spark.HoodieSparkKryoRegistrar", "spark.sql.catalog.spark_catalog.type": "hudi", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog" }, "application": "s3a://<data_storage_name>/hudi_final_cas.py" } }
Parameter values:- <V4_JOBS_API_ENDPOINT>: The endpoint for the instance that you want to use to submit your Spark job. To get the Spark jobs endpoint for your provisioned instance, see Administering the service instance.
- <TOKEN>: To get the access token for your service instance, see Generating an API authorization token.
- <data_storage_name>: The name of the storage bucket that stores data.
- <bucket_endpoint> : Provide the Metastore host value of the storage.
- <access_key> : Provide the
access_key_id
. Provide theaccess_key_id
of the storage. - <secret_key> : Provide the
secret_access_key
. Provide thesecret_access_key
of the storage. - <wxd-user-name>: Your user name credential for the watsonx.data cluster. Note: You must have `Metastore Admin` access on Metadata Service. For more information, see Managing access to the Metadata Service (MDS).
- <wxd-user-password>: Your password for the watsonx.data cluster.