Premiers pas avec les cas d'utilisation watsonx.data de Spark
Vous pouvez exécuter des cas d'utilisation Spark à watsonx.data l'aide Python d'exemples. Tous les exemples sont écrits à l'aide des API Python Spark.
Avant de commencer
Avant de pouvoir exécuter des cas d'utilisation Spark pour watsonx.data, vous devez :
- Fournir une IBMwatsonx.data instance en fonction de l'environnement ( watsonx.data sur CPD, watsonx.data sur Cloud) que vous choisissez.
- Fournir une IBM Analytics Engine instance dans un Cloud Pak for Data cluster. Voir la section « Provisionnement d'une instance ».
- Créer un volume dans Cloud Pak for Data. Voir Créer un volume Cloud Pak for Data de stockage.
À propos de l'exemple d'utilisation
Le fichier exemple illustre les fonctions suivantes :
Accès aux tables à watsonx.data
partir de : La section Créer une base de données dans le catalogue Lakehouse du fichier Python d'exemple crée une base de données demodb dans l'instance watsonx.data configurée avec un catalogue nommé lakehouse. demodb est configuré pour stocker toutes les données et métadonnées dans le Cloud Object Storage bucket lakehouse-bucket. Il crée également une table testTable iceberg et y accède.
Importation de données dans watsonx.data
: La section « Importation de données Parquet dans une table Lakehouse » du fichier Python d'exemple vous permet d'importer des données au format Parquet et au format « CSV » depuis un volume Spark vers une table watsonx.data. Des données d'exemple au format Parquet sont importées depuis le volume vers la table watsonx.data yellow_taxi_2022 (téléchargée à l'étape 2 ). La section « Importation de données Parquet dans une table Lakehouse » du fichier Python d'exemple montre également comment importer des données au format « CSV » depuis le répertoire « spark-vol » vers la table « zipcode » de la base de données « demodb ». Pour savoir comment accéder aux données d'exemple et les importer dans le volume, consultez la section « Insertion de données d'exemple dans le volume « Cloud Pak for Data ».
Modification du schéma dans watsonx.data
: La section Évolution du schéma du fichier Python d'exemple vous permet de modifier les données dans watsonx.data
Activités de maintenance des tables dans watsonx.data
: La maintenance des tables permet de maintenir les performances watsonx.data des tables. Iceberg propose des procédures de maintenance des tables prêtes à l'emploi qui permettent d'effectuer des optimisations puissantes des tables de manière déclarative. L'exemple suivant montre comment effectuer certaines opérations de maintenance de table à l'aide de Spark.
Pour plus d'informations sur les opérations de maintenance des tables dans Iceberg Spark, consultez la section Opérations sur les tables.
Exécution du cas d'utilisation type
Pour exécuter des cas watsonx.data d'utilisation Spark à l'aide de fichiers Python d'exemple :
Enregistrez l'exemple suivant sous forme de Python fichier :
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder \ .appName("lh-hms-cloud") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \ .enableHiveSupport() \ .getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.demodb.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.demodb.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.demodb.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.demodb.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.demodb.testTable purge') spark.sql('drop table if exists lakehouse.demodb.zipcodes purge') spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.demodb cascade') def main(): try: spark = init_spark() create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database clean_database(spark) spark.stop() if __name__ == '__main__': main()Téléchargez le Python fichier sur le volume cpd.
Générer un jeton d'accès. Voir Génération d'un jeton d'autorisation API.
Exécutez la commande curl suivante pour soumettre l'application Spark. Pour plus d'informations sur la soumission d'une application Spark, consultez la syntaxe, les paramètres et les codes de retour de l'API Spark Jobs.
curl -k -X POST https://<cpd-url>/v4/analytics_engines/<intstnce id>/spark_applications -H "Authorization: ZenApiKey ${TOKEN}" -X POST -d '{ "application_details": { "application": "/spark-vol/<pythonfile-name>.py", "conf": { "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar" } }, "volumes": [{ "name": "cpd-instance::<volume-name>", "mount_path": "/spark-vol/", "source_sub_path": "spark" }] }'Exemple :
curl -k -X POST https://cpd-cpd-instance.apps.spark-cpd-test.cp.fyre.ibm.com/v4/analytics_engines/14c3c133-ba87-406c-a06d-16157d969d5b/spark_applications -H "Authorization: ZenApiKey eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IllZR1FJWkpoX3dYSWRrTXNqMV95dU5qR3VUeW1LT0xYcU1UN1UxSmxFUk0ifQ.eyJ1c2VybmFtZSI6ImFkbWluIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtb25pdG9yX3BsYXRmb3JtIiwiY29uZmlndXJlX3BsYXRmb3JtIiwidmlld19wbGF0Zm9ybV9oZWFsdGgiLCJjb25maWd1cmVfYXV0aCIsIm1hbmFnZV91c2VycyIsIm1hbmFnZV9ncm91cHMiLCJtYW5hZ2Vfc2VydmljZV9pbnN0YW5jZXMiXSwiZ3JvdXBzIjpbMTAwMDBdLCJzdWIiOiJhZG1pbiIsImlzcyI6IktOT1hTU08iLCJhdWQiOiJEU1giLCJ1aWQiOiIxMDAwMzMwOTk5IiwiYXV0aGVudGljYXRvciI6ImRlZmF1bHQiLCJkaXNwbGF5X25hbWUiOiJhZG1pbiIsImFwaV9yZXF1ZXN0IjpmYWxzZSwiaWF0IjoxNjkwOTcyNDgwLCJleHAiOjE2OTEwMTU2ODB9.ALFvnY7cYNENfMpPRncxcKXCkt4F_xy8sI50DoQgtu_oSFB9fbstrUMe8XbNwermM-DCpC64XGjJdZaYeHXumCaelPETeQdqQnYRWONvyr18-y5Xn4GZlSvzzmiRvhzGZ-bt5cLsC_iEftouS_Opm_wnfaMUjYIPjVEzjFBTQh6PDMo7Q79kdZIyeZbHCKdKYVUpaZgHWKvZqMtNGGnm9PydN5TIRMws81817l96a7LWkI4UZYgybX53iNU0Mun_4iYqEizQm_dmII0_I7YkA1bBQuVtKcRFqfG5MucAR0REz3k9sBIXzn_qTFFrN_lQr2oSnWHviLUyAy1UCaHQNw" -X POST -d '{ "application_details": { "application": "/spark-vol/test-cpdspark.py", "conf": { "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar", } }, "volumes": [{ "name": "cpd-instance::spark-artifacts-vol", "mount_path": "/spark-vol/", "source_sub_path": "spark" }] }'
Insertion d'exemples de données dans le Cloud Pak for Data volume
Pour insérer des données dans le Cloud Pak for Data volume :
Créez un volume, nommé spark-vol, pour stocker les données d'exemple à ingérer dans watsonx.data l'instance. Pour plus d'informations sur la création d'un volume, consultez la section Gestion des volumes de stockage.
Téléchargez les exemples de données Parquet et le fichier d' CSV d'exemple suivants :
- Exemple de fichier parquet (exemple de données pour six mois de données de taxi pour l'année 2022)
- Exemple de fichier « CSV » ( zipcodes.csv )
Téléchargez les données d'exemple dans le volume spark-vol. Pour obtenir des instructions, consultez la section Parcourir le volume de stockage et télécharger du contenu.