Introduzione ai casi d'uso di watsonx.data Spark

È possibile eseguire casi d'uso di Spark watsonx.data utilizzando Python esempi. Tutti gli esempi sono stati scritti utilizzando le API Python Spark.

Prima di iniziare

Prima di poter eseguire casi d'uso Spark per watsonx.data, è necessario:

Informazioni sul caso d'uso di esempio

Il file di esempio illustra le seguenti funzioni:

Accesso alle tabelle da watsonx.data
: La sezione Crea un database nel catalogo Lakehouse del file Python di esempio crea un database demodb nell'istanza watsonx.data configurata con un catalogo denominato lakehouse. demodb è configurato per archiviare tutti i dati e i metadati nel Cloud Object Storage bucket lakehouse-bucket. Crea anche una tabella testTable iceberg e vi accede.


Importazione dei dati in watsonx.data
: la sezione "Importazione di dati Parquet in una tabella Lakehouse" del file Python di esempio consente di importare dati in formato Parquet e CSV da un volume Spark (spark-vol) in una tabella watsonx.data. I dati di esempio in formato Parquet vengono inseriti dal volume nella tabella watsonx.data denominata yellow_taxi_2022 (scaricata al punto 2 ). La sezione "Inserimento dei dati del parquet in una tabella Lakehouse " del file Python di esempio mostra anche come inserire dati in formato " CSV " dal volume Spark nella tabella "zipcode" del database "demodb". Per istruzioni su come accedere ai dati di esempio e caricarli nel volume, consultare la sezione "Inserimento dei dati di esempio nel volume ' Cloud Pak for Data '".


Modifica dello schema in watsonx.data
: La sezione Evoluzione dello schema del file Python di esempio consente di modificare i dati in watsonx.data


Attività di manutenzione delle tabelle in watsonx.data
: La manutenzione delle tabelle contribuisce a mantenerne le watsonx.data prestazioni. Iceberg fornisce procedure di manutenzione delle tabelle pronte all'uso che consentono di eseguire potenti ottimizzazioni delle tabelle in modo dichiarativo. Il seguente esempio mostra come eseguire alcune operazioni di manutenzione delle tabelle utilizzando Spark.

Per ulteriori informazioni sulle operazioni di manutenzione della tabella Iceberg Spark, vedere Operazioni sulle tabelle.

Esecuzione del caso d'uso di esempio

Per eseguire watsonx.data casi d'uso Spark utilizzando file di Python esempio:

  1. Salva il seguente esempio come file Python :

    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
        spark = SparkSession.builder \
            .appName("lh-hms-cloud") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \
            .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \
            .enableHiveSupport() \
            .getOrCreate()
        return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')
    
    def main():
        try:
            spark = init_spark()
            create_database(spark)
            list_databases(spark)
            basic_iceberg_table_operations(spark)
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
  2. Carica il Python file sul volume cpd.

  3. Genera un token di accesso. Vedi Generazione di un token di autorizzazione API.

  4. Esegui il seguente comando curl per inviare l'applicazione Spark. Per ulteriori informazioni sull'invio di un'applicazione Spark, consulta Sintassi, parametri e codici di ritorno dell'API dei lavori Spark.

    curl -k -X POST https://<cpd-url>/v4/analytics_engines/<intstnce id>/spark_applications -H "Authorization: ZenApiKey ${TOKEN}" -X POST -d '{
        "application_details": {
            "application": "/spark-vol/<pythonfile-name>.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar"
            }
        },
        "volumes": [{
            "name": "cpd-instance::<volume-name>",
            "mount_path": "/spark-vol/",
            "source_sub_path": "spark"
        }]
    }'
    

    Esempio:

    curl -k -X POST https://cpd-cpd-instance.apps.spark-cpd-test.cp.fyre.ibm.com/v4/analytics_engines/14c3c133-ba87-406c-a06d-16157d969d5b/spark_applications -H "Authorization: ZenApiKey eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IllZR1FJWkpoX3dYSWRrTXNqMV95dU5qR3VUeW1LT0xYcU1UN1UxSmxFUk0ifQ.eyJ1c2VybmFtZSI6ImFkbWluIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtb25pdG9yX3BsYXRmb3JtIiwiY29uZmlndXJlX3BsYXRmb3JtIiwidmlld19wbGF0Zm9ybV9oZWFsdGgiLCJjb25maWd1cmVfYXV0aCIsIm1hbmFnZV91c2VycyIsIm1hbmFnZV9ncm91cHMiLCJtYW5hZ2Vfc2VydmljZV9pbnN0YW5jZXMiXSwiZ3JvdXBzIjpbMTAwMDBdLCJzdWIiOiJhZG1pbiIsImlzcyI6IktOT1hTU08iLCJhdWQiOiJEU1giLCJ1aWQiOiIxMDAwMzMwOTk5IiwiYXV0aGVudGljYXRvciI6ImRlZmF1bHQiLCJkaXNwbGF5X25hbWUiOiJhZG1pbiIsImFwaV9yZXF1ZXN0IjpmYWxzZSwiaWF0IjoxNjkwOTcyNDgwLCJleHAiOjE2OTEwMTU2ODB9.ALFvnY7cYNENfMpPRncxcKXCkt4F_xy8sI50DoQgtu_oSFB9fbstrUMe8XbNwermM-DCpC64XGjJdZaYeHXumCaelPETeQdqQnYRWONvyr18-y5Xn4GZlSvzzmiRvhzGZ-bt5cLsC_iEftouS_Opm_wnfaMUjYIPjVEzjFBTQh6PDMo7Q79kdZIyeZbHCKdKYVUpaZgHWKvZqMtNGGnm9PydN5TIRMws81817l96a7LWkI4UZYgybX53iNU0Mun_4iYqEizQm_dmII0_I7YkA1bBQuVtKcRFqfG5MucAR0REz3k9sBIXzn_qTFFrN_lQr2oSnWHviLUyAy1UCaHQNw" -X POST -d '{
        "application_details": {
            "application": "/spark-vol/test-cpdspark.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
    
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark-artifacts-vol",
            "mount_path": "/spark-vol/",
            "source_sub_path": "spark"
        }]
    }'
    

Inserimento dei dati campione nel Cloud Pak for Data volume

Per inserire dati nel Cloud Pak for Data volume:

  1. Creare un volume, denominato spark-vol, per archiviare i dati di esempio da importare watsonx.data nell'istanza. Per informazioni sulla creazione di un volume, vedere Gestione dei volumi di archiviazione.

  2. Scarica i seguenti dati di esempio relativi al parquet e il file di esempio « CSV »:

  3. Carica i dati di esempio nel volume spark-vol. Per le istruzioni, consulta Esplora il volume di archiviazione e carica i contenuti.