Premiers pas avec les cas d'utilisation watsonx.data de Spark

Vous pouvez exécuter des cas d'utilisation Spark à watsonx.data l'aide Python d'exemples. Tous les exemples sont écrits à l'aide des API Python Spark.

Avant de commencer

Avant de pouvoir exécuter des cas d'utilisation Spark pour watsonx.data, vous devez :

À propos de l'exemple d'utilisation

Le fichier exemple illustre les fonctions suivantes :

Accès aux tables à watsonx.data
partir de : La section Créer une base de données dans le catalogue Lakehouse du fichier Python d'exemple crée une base de données demodb dans l'instance watsonx.data configurée avec un catalogue nommé lakehouse. demodb est configuré pour stocker toutes les données et métadonnées dans le Cloud Object Storage bucket lakehouse-bucket. Il crée également une table testTable iceberg et y accède.


Importation de données dans watsonx.data
: La section « Importation de données Parquet dans une table Lakehouse » du fichier Python d'exemple vous permet d'importer des données au format Parquet et au format « CSV » depuis un volume Spark vers une table watsonx.data. Des données d'exemple au format Parquet sont importées depuis le volume vers la table watsonx.data yellow_taxi_2022 (téléchargée à l'étape 2 ). La section « Importation de données Parquet dans une table Lakehouse » du fichier Python d'exemple montre également comment importer des données au format « CSV » depuis le répertoire « spark-vol » vers la table « zipcode » de la base de données « demodb ». Pour savoir comment accéder aux données d'exemple et les importer dans le volume, consultez la section « Insertion de données d'exemple dans le volume « Cloud Pak for Data ».


Modification du schéma dans watsonx.data
: La section Évolution du schéma du fichier Python d'exemple vous permet de modifier les données dans watsonx.data


Activités de maintenance des tables dans watsonx.data
: La maintenance des tables permet de maintenir les performances watsonx.data des tables. Iceberg propose des procédures de maintenance des tables prêtes à l'emploi qui permettent d'effectuer des optimisations puissantes des tables de manière déclarative. L'exemple suivant montre comment effectuer certaines opérations de maintenance de table à l'aide de Spark.

Pour plus d'informations sur les opérations de maintenance des tables dans Iceberg Spark, consultez la section Opérations sur les tables.

Exécution du cas d'utilisation type

Pour exécuter des cas watsonx.data d'utilisation Spark à l'aide de fichiers Python d'exemple :

  1. Enregistrez l'exemple suivant sous forme de Python fichier :

    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
        spark = SparkSession.builder \
            .appName("lh-hms-cloud") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \
            .enableHiveSupport() \
            .getOrCreate()
        return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')
    
    def main():
        try:
            spark = init_spark()
            create_database(spark)
            list_databases(spark)
            basic_iceberg_table_operations(spark)
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
  2. Téléchargez le Python fichier sur le volume cpd.

  3. Générer un jeton d'accès. Voir Génération d'un jeton d'autorisation API.

  4. Exécutez la commande curl suivante pour soumettre l'application Spark. Pour plus d'informations sur la soumission d'une application Spark, consultez la syntaxe, les paramètres et les codes de retour de l'API Spark Jobs.

    curl -k -X POST https://<cpd-url>/v4/analytics_engines/<intstnce id>/spark_applications -H "Authorization: ZenApiKey ${TOKEN}" -X POST -d '{
        "application_details": {
            "application": "/spark-vol/<pythonfile-name>.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar"
            }
        },
        "volumes": [{
            "name": "cpd-instance::<volume-name>",
            "mount_path": "/spark-vol/",
            "source_sub_path": "spark"
        }]
    }'
    

    Exemple :

    curl -k -X POST https://cpd-cpd-instance.apps.spark-cpd-test.cp.fyre.ibm.com/v4/analytics_engines/14c3c133-ba87-406c-a06d-16157d969d5b/spark_applications -H "Authorization: ZenApiKey eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IllZR1FJWkpoX3dYSWRrTXNqMV95dU5qR3VUeW1LT0xYcU1UN1UxSmxFUk0ifQ.eyJ1c2VybmFtZSI6ImFkbWluIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtb25pdG9yX3BsYXRmb3JtIiwiY29uZmlndXJlX3BsYXRmb3JtIiwidmlld19wbGF0Zm9ybV9oZWFsdGgiLCJjb25maWd1cmVfYXV0aCIsIm1hbmFnZV91c2VycyIsIm1hbmFnZV9ncm91cHMiLCJtYW5hZ2Vfc2VydmljZV9pbnN0YW5jZXMiXSwiZ3JvdXBzIjpbMTAwMDBdLCJzdWIiOiJhZG1pbiIsImlzcyI6IktOT1hTU08iLCJhdWQiOiJEU1giLCJ1aWQiOiIxMDAwMzMwOTk5IiwiYXV0aGVudGljYXRvciI6ImRlZmF1bHQiLCJkaXNwbGF5X25hbWUiOiJhZG1pbiIsImFwaV9yZXF1ZXN0IjpmYWxzZSwiaWF0IjoxNjkwOTcyNDgwLCJleHAiOjE2OTEwMTU2ODB9.ALFvnY7cYNENfMpPRncxcKXCkt4F_xy8sI50DoQgtu_oSFB9fbstrUMe8XbNwermM-DCpC64XGjJdZaYeHXumCaelPETeQdqQnYRWONvyr18-y5Xn4GZlSvzzmiRvhzGZ-bt5cLsC_iEftouS_Opm_wnfaMUjYIPjVEzjFBTQh6PDMo7Q79kdZIyeZbHCKdKYVUpaZgHWKvZqMtNGGnm9PydN5TIRMws81817l96a7LWkI4UZYgybX53iNU0Mun_4iYqEizQm_dmII0_I7YkA1bBQuVtKcRFqfG5MucAR0REz3k9sBIXzn_qTFFrN_lQr2oSnWHviLUyAy1UCaHQNw" -X POST -d '{
        "application_details": {
            "application": "/spark-vol/test-cpdspark.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
    
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark-artifacts-vol",
            "mount_path": "/spark-vol/",
            "source_sub_path": "spark"
        }]
    }'
    

Insertion d'exemples de données dans le Cloud Pak for Data volume

Pour insérer des données dans le Cloud Pak for Data volume :

  1. Créez un volume, nommé spark-vol, pour stocker les données d'exemple à ingérer dans watsonx.data l'instance. Pour plus d'informations sur la création d'un volume, consultez la section Gestion des volumes de stockage.

  2. Téléchargez les exemples de données Parquet et le fichier d' CSV d'exemple suivants :

  3. Téléchargez les données d'exemple dans le volume spark-vol. Pour obtenir des instructions, consultez la section Parcourir le volume de stockage et télécharger du contenu.