Working with Delta Lake catalog
The topic describes the procedure to run a Spark application that ingests data into a Delta Lake catalog.
watsonx.data on IBM Software Hub
Procedure
- Create a storage with Delta Lake catalog to ingest and manage data in delta table format. To create storage with Delta Lake catalog, see Adding a storage-catalog pair.Adding storage.
- Associate the storage with the external Spark engine. For more information, see Associating a catalog with an engine.
- Create Cloud Object Storage (COS) to store the Spark application. To create Cloud Object Storage and a bucket, see Creating a storage bucket. You can also use other storage types such as IBM Storage Ceph, Amazon S3, or Min IO.
- Register the Cloud Object Storage in watsonx.data. For more information, see Adding a storage-catalog pair.
- Save the following Spark application (Python file) to your local machine. Here,
delta_demo.py
.The Python Spark application demonstrates the following functionality:- It creates a database inside the Delta Lake catalog (that you created to store data). Here,
iae
. - It creates a table inside the
iae
database, namelyemployee
. - It inserts data into the
employee
and doesSELECT
query operation. - It drops the table and schema after use.
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def main(): spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.iae LOCATION 's3a://delta-connector-test/'").show() spark.sql("create table if not exists spark_catalog.iae.employee (id bigint, name string, location string) USING DELTA").show() spark.sql("insert into spark_catalog.iae.employee VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.iae.employee").show() spark.sql("drop table spark_catalog.iae.employee").show() spark.sql("drop schema spark_catalog.iae CASCADE").show() spark.stop() if __name__ == '__main__': main()
- It creates a database inside the Delta Lake catalog (that you created to store data). Here,
- Upload the Spark application to the COS, see Uploading data.
- To submit the Spark application with data residing in Cloud Object Storage, specify the
parameter values and run the following curl command.
curl -k -X POST <V4_JOBS_API_ENDPOINT> -H "Authorization: ZenApiKey $<TOKEN>" -d '{ { "application_details": { "conf": { "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<access_key>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<secret_key>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint": "<bucket_endpoint>", "spark.sql.catalogImplementation" : "hive", "spark.sql.extensions" : "io.delta.sql.DeltaSparkSessionExtension", "spark.serializer" : "org.apache.spark.serializer.KryoSerializer", "spark.hadoop.hive.metastore.schema.verification" : "false", "spark.hadoop.hive.metastore.schema.verification.record.version" : "false", "spark.hadoop.datanucleus.schema.autoCreateTables" : "false", "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.type" : "hive", "spark.hive.metastore.uris" : "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083", "spark.hive.metastore.use.SSL" : "true", "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts", "spark.hive.metastore.truststore.password" : "changeit", "spark.hive.metastore.truststore.type" : "JKS", "spark.hive.metastore.client.auth.mode" : "PLAIN", "spark.hive.metastore.client.plain.username" : "cpadmin", "spark.hive.metastore.client.plain.password" : "QQJM8GWsiU1m9XAwLoYiaZIMLdqC8q5Z", "spark.hadoop.fs.s3a.path.style.access" : "true" }, "application": "s3a://delta-connector-test/delta_demo.py" } }
- <V4_JOBS_API_ENDPOINT>: The endpoint for the instance that you want to use to submit your Spark job. To get the Spark jobs endpoint for your provisioned instance, see Administering the service instance.
- <TOKEN>: To get the access token for your service instance, see Generating an API authorization token.
- <data_storage_name>: The name of the storage bucket that stores data.
- <bucket_endpoint> : Provide the Metastore host value of the storage.
- <access_key> : Provide the
access_key_id
. Provide theaccess_key_id
of the storage. - <secret_key> : Provide the
secret_access_key
. Provide thesecret_access_key
of the storage. - <wxd-user-name>: Your user name credential for the watsonx.data cluster. Note: You must have `Metastore Admin` access on Metadata Service. For more information, see Managing access to the Metadata Service (MDS).
- <wxd-user-password>: Your password for the watsonx.data cluster.