Track Spark applications
You can track your Spark applications by using Databand through the methods that are mentioned in this section.
watsonx.data on IBM Software Hub
- Databand listener
- This method automatically tracks dataset operations. Your Spark script can benefit from automatic tracking of dataset operations.
- Databand decorators and logging API
-
To use this method, you need to import the
dbnd
module that involves modifying your code.- End-to-end example of using Databand APIs
-
The following example demonstrates the use of
dbnd
APIs.example.py
import time import logging from pyspark.sql import SparkSession from pyspark.sql.functions import col from dbnd import dbnd_tracking, task, dataset_op_logger, log_metric, log_dataframe # Initialize Spark session spark = SparkSession.builder \ .appName("Data Pipeline with Databand") \ .getOrCreate() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @task def create_sample_data(): # Create a DataFrame with sample data including columns to be dropped data = [ ("John", "Camping Equipment", 500, "Regular", "USA"), ("Jane", "Golf Equipment", 300, "Premium", "UK"), ("Mike", "Camping Equipment", 450, "Regular", "USA"), ("Emily", "Golf Equipment", 350, "Premium", "Canada"), ("Anna", "Camping Equipment", 600, "Regular", "USA"), ("Tom", "Golf Equipment", 200, "Regular", "UK") ] columns = ["Name", "Product line", "Sales", "Customer Type", "Country"] retailData = spark.createDataFrame(data, columns) # Log the data creation unique_file_name = "sample-data" with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger: logger.set(data=retailData) return retailData @task def filter_data(rawData): # Define columns to drop columns_to_drop = ['Customer Type', 'Country'] # Drop the specified columns in PySpark DataFrame filteredRetailData = rawData.drop(*columns_to_drop) # Log the data after dropping columns unique_file_name = 'script://Weekly_Sales/Filtered_df' with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True) as logger: logger.set(data=filteredRetailData) return filteredRetailData @task def write_data_by_product_line(filteredData): # Filter data for Camping Equipment and write to CSV campingEquipment = filteredData.filter(col('Product line') == 'Camping Equipment') campingEquipment.write.csv("Camping_Equipment.csv", header=True, mode="overwrite") # Log writing the Camping Equipment CSV log_dataframe("camping_equipment", campingEquipment, with_schema=True, with_stats=True) # Filter data for Golf Equipment and write to CSV golfEquipment = filteredData.filter(col('Product line') == 'Golf Equipment') golfEquipment.write.csv("Golf_Equipment.csv", header=True, mode="overwrite") # Log writing the Golf Equipment CSV log_dataframe("golf_equipment", golfEquipment, with_schema=True, with_stats=True) def prepare_retail_data(): with dbnd_tracking( conf={ "tracking": { "track_source_code": True }, "log": { "preview_head_bytes": 15360, "preview_tail_bytes": 15360 } } ): logger.info("Running Databand spark application!") start_time_milliseconds = int(round(time.time() * 1000)) log_metric("metric_check", "OK") # Call the step job - create sample data rawData = create_sample_data() # Filter data filteredData = filter_data(rawData) # Write data by product line write_data_by_product_line(filteredData) end_time_milliseconds = int(round(time.time() * 1000)) elapsed_time = end_time_milliseconds - start_time_milliseconds log_metric('elapsed-time', elapsed_time) logger.info(f"Total pipeline running time: {elapsed_time:.2f} milliseconds") logger.info("Spark execution completed..") log_metric("is-success", "OK") # Invoke the main function prepare_retail_data()
- dbnd_tracking
- Initializes tracking for your pipeline or application, configuring Databand settings and logging execution details.
- task
- Marks a function as a Databand task, enabling tracking and monitoring of individual steps in your pipeline.
- dataset_op_logger
- Logs operations on datasets, including schema and statistics.
- log_metric
- Records custom metrics to track performance or other quantitative data during execution.
- log_dataframe
- Logs details about a DataFrame, such as schema and statistics, for monitoring data transformations.
For information about submitting Spark jobs, see Submit engine applications.
After you submit the Spark application, you will receive a confirmation message with the application ID and Spark version. Keep this information for tracking the execution status of your submitted Spark job. You can monitor and track datasets by using Databand's tracking features within the Databand environment.
To view the Databand dashboard for tracking, go to View Databand.
and click- IBM Data Observability by Databand
- For PySpark applications: Tracking PySpark
- For Spark(Java/Scala): Tracking Spark(Scala/Java)