Tracking PySpark
If you run jobs in PySpark, Databand can provide information about your code errors, metrics, and logging information, in the context of your broader pipeline or orchestration system.
You can use Databand decorators and the logging API, similarly to how you use Python. For more information, see Tracking Python functions. The following example shows a PySpark function with the Databand decorator and metrics logging:
import sys
from operator import add
from pyspark.sql import SparkSession
import logging
from dbnd import log_metric, task
logger = logging.getLogger(__name__)
@task
def calculate_counts(input_file, output_file):
spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
counts = (
lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
)
counts.saveAsTextFile(output_file)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
log_metric("counts", len(output))
logger.info(
"Log message from EMR cluster"
)
spark.sparkContext.stop()
if __name__ == "__main__":
These code examples contain a number of artifacts that are reported to the Databand tracking system.
for (word, count) in output: print("%s: %i" % (word, count))The second example is the Databand log_metric API, which reports a count.
When you use Python logging facilities, for example - logger.info (the line
following the log_metric API in the code), all logs are also reported.
Databand correlates the tracking metrics from the Spark job with the associated pipeline from your orchestration system (for example, an Airflow DAG) based on the user design.
You can run this script in the following way:
- Enable explicit tracking for @task code, for
example:
export DBND__TRACKING=True export DBND__ENABLE__SPARK_CONTEXT_ENV=True - Provide your login information in one of the following
ways:
export DBND__CORE__DATABAND_URL=... export DBND__CORE__DATABAND_ACCESS_TOKEN=... - Add the Spark command to track the metrics from the Spark job with the associated
pipeline from your orchestration
system:
export DBND__TRACKING=True export DBND__ENABLE__SPARK_CONTEXT_ENV=True export DBND__CORE__DATABAND_URL=... export DBND__CORE__DATABAND_ACCESS_TOKEN=... spark-submit --conf "spark.env.DBND__RUN__NAME=my_run_name" my_pyspark_script.py <path/to/input_file> <path/to/output_file>
Tracking dataframes
You can use the dataset logging API to track Spark DataFrame, see Datasets.
Integrating with Databand listener
Your PySpark script can benefit from automatic tracking of dataset operations. For more information, see JVM SDK configuration.
- Provide all relevant variables into the following script to enable tracking, for
example:
export DBND__TRACKING__PROJECT="MyProjectName" export DBND__TRACKING=True - Add the following command to the variables, to enable the Databand
listener:
export DBND__TRACKING__PROJECT="MyProjectName" export DBND__TRACKING=True spark-submit --driver-java-options "-javaagent:/PATH_TO_AGENT" \ --conf "spark.sql.queryExecutionListeners=ai.databand.spark.DbndSparkQueryExecutionListener" \ --conf "spark.extraListeners=ai.databand.spark.DbndSparkListener" \ my_pyspark_script.py <path/to/input_file> <path/to/output_file>
Integrating with PyDeequ for data quality metrics
Databand can collect and send PyDeequ metrics, see PyDeequ.
Follow the instructions in the installation guide. Make sure that the Databand JVM client is connected to
your Spark application by including the ai.databand:dbnd-api-deequ package. For
more information about how to do this, see Adding Databand libraries to your JVM application.
To connect Databand to Deequ, use DbndDeequMetricsRepository as in the following
example. For more information, see Deequ repository documentation:
from dbnd_spark.deequ_metrics_repository import DbndMetricsRepository
result_key = ResultKey(spark, ResultKey.current_milli_time(), {"name": "words"})
analysis_runner = AnalysisRunner(spark).onData(lines)
## implement your Deequ Validations, for example :
## .addAnalyzer( ApproxCountDistinct("value") )
analysis_runner.useRepository(DbndMetricsRepository(spark)).saveOrAppendResult(result_key).run()