Tracking PySpark

If you run jobs in PySpark, Databand can provide information about your code errors, metrics, and logging information, in the context of your broader pipeline or orchestration system.

You can use Databand decorators and the logging API, similarly to how you use Python. For more information, see Tracking Python functions. The following example shows a PySpark function with the Databand decorator and metrics logging:

Note: The following scripts are just examples, remember to add your own data.
import sys
from operator import add
from pyspark.sql import SparkSession
import logging
from dbnd import log_metric, task

logger = logging.getLogger(__name__)

@task
def calculate_counts(input_file, output_file):
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    counts = (
        lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
    )
    counts.saveAsTextFile(output_file)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
    log_metric("counts", len(output))
    logger.info(
        "Log message from EMR cluster"
    )
    spark.sparkContext.stop()


if __name__ == "__main__":

These code examples contain a number of artifacts that are reported to the Databand tracking system.

The first example shows the output of the following Python snippet:
for (word, count) in output: print("%s: %i" % (word, count))

The second example is the Databand log_metric API, which reports a count. When you use Python logging facilities, for example - logger.info (the line following the log_metric API in the code), all logs are also reported.

Databand correlates the tracking metrics from the Spark job with the associated pipeline from your orchestration system (for example, an Airflow DAG) based on the user design.

You can run this script in the following way:

  1. Enable explicit tracking for @task code, for example:
    export DBND__TRACKING=True
    export DBND__ENABLE__SPARK_CONTEXT_ENV=True
    
  2. Provide your login information in one of the following ways:
    export DBND__CORE__DATABAND_URL=...
    export DBND__CORE__DATABAND_ACCESS_TOKEN=...
    
  3. Add the Spark command to track the metrics from the Spark job with the associated pipeline from your orchestration system:
    export DBND__TRACKING=True
    export DBND__ENABLE__SPARK_CONTEXT_ENV=True
    
    export DBND__CORE__DATABAND_URL=...
    export DBND__CORE__DATABAND_ACCESS_TOKEN=...
    
    spark-submit --conf "spark.env.DBND__RUN__NAME=my_run_name"  my_pyspark_script.py <path/to/input_file> <path/to/output_file>

Tracking dataframes

You can use the dataset logging API to track Spark DataFrame, see Datasets.

Integrating with Databand listener

Your PySpark script can benefit from automatic tracking of dataset operations. For more information, see JVM SDK configuration.

  1. Provide all relevant variables into the following script to enable tracking, for example:
    export DBND__TRACKING__PROJECT="MyProjectName"
    export DBND__TRACKING=True
  2. Add the following command to the variables, to enable the Databand listener:
    export DBND__TRACKING__PROJECT="MyProjectName"
    export DBND__TRACKING=True
    
    spark-submit --driver-java-options "-javaagent:/PATH_TO_AGENT" \ 
        --conf "spark.sql.queryExecutionListeners=ai.databand.spark.DbndSparkQueryExecutionListener" \
        --conf "spark.extraListeners=ai.databand.spark.DbndSparkListener" \
        my_pyspark_script.py <path/to/input_file> <path/to/output_file>

Integrating with PyDeequ for data quality metrics

Databand can collect and send PyDeequ metrics, see PyDeequ.

Follow the instructions in the installation guide. Make sure that the Databand JVM client is connected to your Spark application by including the ai.databand:dbnd-api-deequ package. For more information about how to do this, see Adding Databand libraries to your JVM application.

To connect Databand to Deequ, use DbndDeequMetricsRepository as in the following example. For more information, see Deequ repository documentation:

from dbnd_spark.deequ_metrics_repository import DbndMetricsRepository

result_key = ResultKey(spark, ResultKey.current_milli_time(), {"name": "words"})
analysis_runner = AnalysisRunner(spark).onData(lines)

## implement your Deequ Validations, for example :
## .addAnalyzer( ApproxCountDistinct("value") )

analysis_runner.useRepository(DbndMetricsRepository(spark)).saveOrAppendResult(result_key).run()
Note: For more information about running Scala or Java Spark, see Tracking Spark with Scala or Java.