Introduzione ai casi d'uso di watsonx.data Spark
È possibile eseguire casi d'uso di Spark watsonx.data utilizzando Python esempi. Tutti gli esempi sono stati scritti utilizzando le API Python Spark.
Prima di iniziare
Prima di poter eseguire casi d'uso Spark per watsonx.data, è necessario:
- Fornitura di IBMwatsonx.data un'istanza in base all'ambiente ( watsonx.data su CPD, watsonx.data su Cloud) scelto.
- Fornire IBM Analytics Engine un'istanza in un Cloud Pak for Data cluster. Vedere Provisioning di un'istanza.
- Crea un volume in Cloud Pak for Data. Vedi Creare un volume Cloud Pak for Data di archiviazione.
Informazioni sul caso d'uso di esempio
Il file di esempio illustra le seguenti funzioni:
Accesso alle tabelle da watsonx.data
: La sezione Crea un database nel catalogo Lakehouse del file Python di esempio crea un database demodb nell'istanza watsonx.data configurata con un catalogo denominato lakehouse. demodb è configurato per archiviare tutti i dati e i metadati nel Cloud Object Storage bucket lakehouse-bucket. Crea anche una tabella testTable iceberg e vi accede.
Importazione dei dati in watsonx.data
: la sezione "Importazione di dati Parquet in una tabella Lakehouse" del file Python di esempio consente di importare dati in formato Parquet e CSV da un volume Spark (spark-vol) in una tabella watsonx.data. I dati di esempio in formato Parquet vengono inseriti dal volume nella tabella watsonx.data denominata yellow_taxi_2022 (scaricata al punto 2 ). La sezione "Inserimento dei dati del parquet in una tabella Lakehouse " del file Python di esempio mostra anche come inserire dati in formato " CSV " dal volume Spark nella tabella "zipcode" del database "demodb". Per istruzioni su come accedere ai dati di esempio e caricarli nel volume, consultare la sezione "Inserimento dei dati di esempio nel volume ' Cloud Pak for Data '".
Modifica dello schema in watsonx.data
: La sezione Evoluzione dello schema del file Python di esempio consente di modificare i dati in watsonx.data
Attività di manutenzione delle tabelle in watsonx.data
: La manutenzione delle tabelle contribuisce a mantenerne le watsonx.data prestazioni. Iceberg fornisce procedure di manutenzione delle tabelle pronte all'uso che consentono di eseguire potenti ottimizzazioni delle tabelle in modo dichiarativo. Il seguente esempio mostra come eseguire alcune operazioni di manutenzione delle tabelle utilizzando Spark.
Per ulteriori informazioni sulle operazioni di manutenzione della tabella Iceberg Spark, vedere Operazioni sulle tabelle.
Esecuzione del caso d'uso di esempio
Per eseguire watsonx.data casi d'uso Spark utilizzando file di Python esempio:
Salva il seguente esempio come file Python :
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder \ .appName("lh-hms-cloud") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"<cos_bucket_endpoint>") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<access_key>") \ .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<secret_key>") \ .enableHiveSupport() \ .getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.demodb.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.demodb.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.demodb.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.demodb.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.demodb.testTable purge') spark.sql('drop table if exists lakehouse.demodb.zipcodes purge') spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.demodb cascade') def main(): try: spark = init_spark() create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database clean_database(spark) spark.stop() if __name__ == '__main__': main()Carica il Python file sul volume cpd.
Genera un token di accesso. Vedi Generazione di un token di autorizzazione API.
Esegui il seguente comando curl per inviare l'applicazione Spark. Per ulteriori informazioni sull'invio di un'applicazione Spark, consulta Sintassi, parametri e codici di ritorno dell'API dei lavori Spark.
curl -k -X POST https://<cpd-url>/v4/analytics_engines/<intstnce id>/spark_applications -H "Authorization: ZenApiKey ${TOKEN}" -X POST -d '{ "application_details": { "application": "/spark-vol/<pythonfile-name>.py", "conf": { "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar" } }, "volumes": [{ "name": "cpd-instance::<volume-name>", "mount_path": "/spark-vol/", "source_sub_path": "spark" }] }'Esempio:
curl -k -X POST https://cpd-cpd-instance.apps.spark-cpd-test.cp.fyre.ibm.com/v4/analytics_engines/14c3c133-ba87-406c-a06d-16157d969d5b/spark_applications -H "Authorization: ZenApiKey eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6IllZR1FJWkpoX3dYSWRrTXNqMV95dU5qR3VUeW1LT0xYcU1UN1UxSmxFUk0ifQ.eyJ1c2VybmFtZSI6ImFkbWluIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtb25pdG9yX3BsYXRmb3JtIiwiY29uZmlndXJlX3BsYXRmb3JtIiwidmlld19wbGF0Zm9ybV9oZWFsdGgiLCJjb25maWd1cmVfYXV0aCIsIm1hbmFnZV91c2VycyIsIm1hbmFnZV9ncm91cHMiLCJtYW5hZ2Vfc2VydmljZV9pbnN0YW5jZXMiXSwiZ3JvdXBzIjpbMTAwMDBdLCJzdWIiOiJhZG1pbiIsImlzcyI6IktOT1hTU08iLCJhdWQiOiJEU1giLCJ1aWQiOiIxMDAwMzMwOTk5IiwiYXV0aGVudGljYXRvciI6ImRlZmF1bHQiLCJkaXNwbGF5X25hbWUiOiJhZG1pbiIsImFwaV9yZXF1ZXN0IjpmYWxzZSwiaWF0IjoxNjkwOTcyNDgwLCJleHAiOjE2OTEwMTU2ODB9.ALFvnY7cYNENfMpPRncxcKXCkt4F_xy8sI50DoQgtu_oSFB9fbstrUMe8XbNwermM-DCpC64XGjJdZaYeHXumCaelPETeQdqQnYRWONvyr18-y5Xn4GZlSvzzmiRvhzGZ-bt5cLsC_iEftouS_Opm_wnfaMUjYIPjVEzjFBTQh6PDMo7Q79kdZIyeZbHCKdKYVUpaZgHWKvZqMtNGGnm9PydN5TIRMws81817l96a7LWkI4UZYgybX53iNU0Mun_4iYqEizQm_dmII0_I7YkA1bBQuVtKcRFqfG5MucAR0REz3k9sBIXzn_qTFFrN_lQr2oSnWHviLUyAy1UCaHQNw" -X POST -d '{ "application_details": { "application": "/spark-vol/test-cpdspark.py", "conf": { "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar", } }, "volumes": [{ "name": "cpd-instance::spark-artifacts-vol", "mount_path": "/spark-vol/", "source_sub_path": "spark" }] }'
Inserimento dei dati campione nel Cloud Pak for Data volume
Per inserire dati nel Cloud Pak for Data volume:
Creare un volume, denominato spark-vol, per archiviare i dati di esempio da importare watsonx.data nell'istanza. Per informazioni sulla creazione di un volume, vedere Gestione dei volumi di archiviazione.
Scarica i seguenti dati di esempio relativi al parquet e il file di esempio « CSV »:
- File parquet di esempio (dati di esempio relativi a sei mesi di dati sui taxi per l'anno 2022)
- File di esempio " CSV " ( zipcodes.csv )
Carica i dati di esempio nel volume spark-vol. Per le istruzioni, consulta Esplora il volume di archiviazione e carica i contenuti.