Tracking Spark with Scala or Java
Databand provides a set of Java libraries for tracking JVM-specific applications such as Spark jobs written in Scala or Java.
Follow these instructions to start tracking JVM applications.
Configuring SDK
Make sure that you follow the Installing JVM SDK and agent guide to integrate Databand libraries into your Spark application. Use Installing on a Spark cluster to configure your Spark cluster. The following properties are required for proper tracking: DBND__CORE__DATABAND_URL, DBND__CORE__DATABAND_ACCESS_TOKEN, and DBND__TRACKING.
Tracking job metadata
The following sections describe the different options that are available for tracking pipeline metadata.
You can log any custom metrics that are important for pipeline and data observability. Examples include custom metrics for data quality information, like data counts or null counts, and custom KPIs particular to your data.
To enable logging of strings and numeric values, use the ai.databand.log.DbndLogger.logMetric() method: DbndLogger.logMetric("data", data);
If you have a more complex pipeline structure, or you want to present your pipeline functions and store metadata as separate tasks, you can add annotations to your pipeline code. Method annotation enables input/output tracking for each method and links them visually.
- Annotating for Scala
-
import ai.databand.annotations.Task object ScalaSparkPipeline { @Task def main(args: Array[String]): Unit = { // init code // ... // task 1 val imputed = unitImputation(rawData, columnsToImpute, 10) // task 2 val clean = dedupRecords(imputed, keyColumns) // task 3 val report = createReport(clean) } @Task protected def unitImputation(rawData: DataFrame, columnsToImpute: Array[String], value: Int): DataFrame = { // ... } @Task protected def dedupRecords(data: Dataset[Row], keyColumns: Array[String]): DataFrame = { // ... } @Task protected def createReport(data: Dataset[Row]): Dataset[Row] = { // ... } } - Annotating with Java
-
import ai.databand.annotations.Task; public class ProcessDataSpark { @Task public void processCustomerData(String inputFile, String outputFile) { // setup code... // task 1 Dataset<Row> imputed = unitImputation(rawData, columnsToImpute, 10); // task 2 Dataset<Row> clean = dedupRecords(imputed, keyColumns); // task 3 Dataset<Row> report = createReport(clean); // ... } @Task protected Dataset<Row> unitImputation(Dataset<Row> rawData, String[] columnsToImpute, int value) { // ... } @Task protected Dataset<Row> dedupRecords(Dataset<Row> data, String[] keyColumns) { // ... } @Task protected Dataset<Row> createReport(Dataset<Row> data) { // ... } } - Annotating and tracking annotated tasks with the Databand Java agents
- The Databand agent instruments your application and is included in the application startup script. See Installing JVM SDK and agent and Installing
dbndon Spark cluster.
Logging dataset operations
You can use Databand to track your dataset operations. To track these operations, you must use DbndLogger.logDatasetOperation():
import ai.databand.log.DbndLogger;
//...
@Task("create_report")
public void ingestData(String path) {
Dataset<Row> data = sql.read().json(path);
// 1. Track simple:
DbndLogger.logDatasetOperation(path, DatasetOperationType.READ, data);
//2. Track passed/failed operation with error details:
try {
...
DbndLogger.logDatasetOperation(path, DatasetOperationType.READ, data);
} catch {
case e: Exception =>
DbndLogger.logDatasetOperation(path, DatasetOperationType.READ, DatasetOperationStatus.NOK, data, e)
}
//3. Track failed operation:
DbndLogger.logDatasetOperation(path, DatasetOperationType.READ, DatasetOperationStatus.NOK, data)
// track without preview/schema:
DbndLogger.logDatasetOperation(path, DatasetOperationType.READ, DatasetOperationStatus.OK, data, false, false);
}
//...
For more information, see Dataset logging.
Job logging
Databand supports limiting the size of head/tail logging. The following properties are responsible for controlling it:
-
DBND__LOG__PREVIEW_HEAD_BYTESspecifies how many bytes are fetched from log head. -
DBND__LOG__PREVIEW_TAIL_BYTESspecifies how many bytes are fetched from log tail.
Enabling tracking of Spark metrics and I/O
Databand can capture Spark Executor metrics, and any I/O operation by your spark code. For more information, see Tracking Spark and JVM applications.
You can enable these listeners by configuration.
- Adding the Databand Spark listener by using Scala
-
import ai.databand.annotations.Task import ai.databand.spark.DbndSparkListener import org.apache.spark.sql.{Dataset, Row, SparkSession} object CreateReport { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("CreateReportSparkScala") .getOrCreate val listener = new DbndSparkListener spark.sparkContext.addSparkListener(listener) } } - Adding the Databand Spark listener by using Java
-
import ai.databand.annotations.Task; import ai.databand.spark.DbndSparkListener; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; public class CreateReport { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("CreateReportSparkJava") .getOrCreate(); DbndSparkListener listener = new DbndSparkListener(); spark.sparkContext().addSparkListener(listener); //... } }
Using Spark with Deequ for data quality metrics
Complete the following steps to add Deequ:
- Before you can use Deequ, you must add
deequJARs andai.databand:dbnd-api-deequto your project dependencies. You can do it in one of the following ways:- Adding JARs by using SBT:
libraryDependencies ++= Seq( "com.amazon.deequ" % "deequ" % "x.x.x-spark-x.x" "ai.databand" % "dbnd-api-deequ" % "0.xx.x", ) - Adding JARs by using Maven:
<dependencyManagement> <dependencies> <dependency> <groupId>com.amazon.deequ</groupId> <artifactId>deequ</artifactId> <version>x.x.x-spark-x.x</version> </dependency> <dependency> <groupId>ai.databand</groupId> <artifactId>dbnd-api-deequ</artifactId> <version>0.xx.x</version> </dependency> </dependencies> </dependencyManagement> - Adding JARs by using Gradle
dependencies { implementation 'com.amazon.deequ:deequ:x.x.x-spark-x.x' implementation 'ai.databand:dbnd-api-deequ:0.xx.x' } - Adding the
dbndJVM Deequ metrics repositoryDataband uses a customMetricsRepositoryandDbndResultKey. You need to explicitly add both to the code:import ai.databand.deequ.DbndMetricsRepository @Task protected def dedupRecords(data: Dataset[Row], keyColumns: Array[String]): Dataset[Row] = { val dedupedData = data.dropDuplicates(keyColumns) // custom metrics repository val metricsRepo = new DbndMetricsRepository(new InMemoryMetricsRepository) // capturing dataset verification results VerificationSuite() .onData(dedupedData) .addCheck( Check(CheckLevel.Error, "Dedup testing") .isUnique("name") .isUnique("id") .isComplete("name") .isComplete("id") .isPositive("score")) .useRepository(metricsRepo) .saveOrAppendResult(new DbndResultKey("dedupedData")) .run() // using metrics repositoty to capture dataset profiling results ColumnProfilerRunner() .onData(dedupedData) .useRepository(metricsRepo) .saveOrAppendResult(new DbndResultKey("dedupedData")) .run() }If you already use a metrics repository, you can wrap it inside Databand's new
DbndMetricsRepository(new InMemoryMetricsRepository). Databand submits the metrics to the wrapped repository first, and to the Databand tracker afterward.To distinguish metric keys, use a special
DbndResultKey. Give your checks and profiles names that you can clearly distinguish in the Databand monitoring UI.
- Adding JARs by using SBT: