Spark 对现代数据堆栈至关重要。因此,为 Spark 环境配置适当级别的可观测性极为重要。监控 Spark 有多种选择,包括可提供预配置 Spark 和 Spark SQL 指标仪表板的 SaaS 程序。如果这还不够呢?
典型的 Spark 应用程序设置,无论是自托管还是托管解决方案,通常都包含用于监控集群运行状况的操作仪表板。这些仪表板虽然实用,但只能提供基础设施概览,而非与数据相关的实际指标。确实,当 CPU 使用率增加或集群 RAM 不足时,我们可以推测应用程序可能出现问题,但当数据源改变了模式或来自其他部门的数据损坏时,这些也无济于事。工程师面临的大多数问题是由数据而非底层基础设施引起的,因此他们必须花费大量时间来重现问题,或像侦探一样反复检查文件和存储桶。这正是应用程序监控能发挥作用的地方。
不同场景需要不同层次的可见性,数据工程师需要具备比执行指标更深一层的洞察力。否则,您可能要耗费大量时间来调试 Spark 中的数据质量问题。
在本指南中,您将了解如何为 Spark 实现高层与低层的数据可观察性。高层可观测性需要使用 Spark 的内部系统,例如侦听器 API 和查询执行侦听器。实现低层可观测性,需要学习如何使用库来跟踪数据质量指标。
掌握这两类方法后,您便可以根据具体问题选择最适合的解决方法。
这种指标获取方法非常古老但稳定可靠。实际上,Spark 用户界面也是通过同样的机制实现指标可视化。Spark 侦听器 API 允许开发人员跟踪 Spark 在应用程序执行期间发出的事件。这些事件通常是应用程序启动/结束、作业启动/结束、阶段启动/结束等。请在 SparkJavaDoc 中获取完整列表。Spark 侦听器配置简单,使用方便,可以轻松获取指标。每次执行操作后,Spark 会调用 Spark 侦听器并将一些元数据信息传递给其方法。其中包括执行时间、读取/写入的记录数、读取/写入的字节数等信息。
这种非常基础的低层数据质量监控可用于检查记录数量和大小。假设您有一个每日运行的作业,对传入的数据集执行转换/分析。您可以编写侦听器,检查从输入中读取了多少条记录,并与前一天的结果进行比较。如果差异显著,我们可以推测数据源可能存在问题。
然而此方案需要自行编写监控程序:需部署指标存储系统、配置告警机制。当应用代码变更时,所有指标键值也需同步调整并妥善处理。
但是,即使是简单的 Spark 侦听器也能提供一些洞察分析。
以下是此类 Spark 侦听器的示例:
public class SomeSparkListener extends SparkListener{
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
您可通过以下几种方式将 Spark 侦听器添加到应用程序中:
以编程方式添加:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
或通过 spark-submit/spark cluster driver 选项传递参数:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
这是另一种开箱即用的 Spark 监控机制。查询执行侦听器允许开发人员订阅查询完成事件,而不关注极低层指标。它提供有关已执行查询的更高层元数据,例如逻辑和物理计划以及执行指标。
您可以获取查询层面的读取/写入记录数等指标,但这些指标是整个查询的汇总,而不是针对特定任务/作业/阶段。
此外,还可以从计划中提取数据位置和模式等非常有用的信息。您可以提取并存储模式以及数据帧维度,并与之前的运行结果进行比较,在出现问题时触发警报。
不过,从计划中提取数据可能很复杂,因为必须使用低层 Spark API。
此外,实现指标存储和警报机制的运营负担仍然存在。Spark 提供的只是元数据。如何利用它是开发人员的责任。
以下是一个简单的查询执行侦听器示例,用于打印计划和指标:
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
可以通过编程方式或配置添加查询执行侦听器:
在应用程序代码中:SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
通过 spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
实施低层监控可能是一项艰巨工作,但这种“系统”监控方式有一个巨大优势:不会增加计算开销。这些元数据由 Spark 内部生成并记录,因此不会对查询执行时间造成任何影响。
使用侦听器进行监控可以避免修改任何应用程序代码。当您想要跟踪现有和旧版应用程序的数据,但预算有限无法修改时,这种方式极具优势。只需编写一个侦听器,通过 Spark 配置传入,即可获知数据全貌。
通过手动验证可大幅提升对输入数据的可信度。假设输入数据源预期包含某数量级记录,且该数值通常不应低于 X,可编写如下简易检查:
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
监控的可能性是无限的。我们可以比较计数、非空值计数、推断的模式等。
由于许多质量检查都比较基础,例如确保数据帧的形状和内容正确,社区为这类检查开发了方便的库。其中一个库就是 Deequ。它为大多数场景提供了丰富的领域特定语言 (DSL)。值得一试。它还具有高级功能,例如能够进行列剖析、计算最小值/最大值/平均值/百分位数、计算直方图、检测异常等等。
请参考 Deequ 文档中的以下示例:
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "数据单元测试")
.hasSize(_ == 5) // 我们预期有 5 行
.isComplete("id") // 不应出现 NULL
.isUnique("id") // 不应包含重复项
.isComplete("productName") // 不应出现 NULL
// 应仅包含 "high" 和 "low" 这两个值
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // 不应包含负值
// 至少一半的描述应包含 URL
.containsURL("description", _ >= 0.5)
// 一半的项目浏览量应少于 10 次
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
可见,我们将大量检查规则封装成设计良好的开箱即用 DSL。
更重要的是,Deequ 能够存储检查结果并自动与以往运行结果进行比较。这可以通过利用指标存储库来实现。用户可自行编写实现方案,将 Deequ 无缝集成到现有的监控基础设施中。
虽然高层次的应用质量检查比低阶方案更灵活,但存在显著缺陷——性能损耗。由于每次计算都会触发 Spark 操作,在处理大规模数据集时可能产生可观开销。每个"计数"与"条件查询"都可能引发全表扫描。Spark 内部会优化执行计划,但仍需考量这些影响并确保数据剖析不会损害系统性能。
我们回顾了多种监控 Spark 应用程序数据质量的方法。低层方法利用 Spark 事件侦听器 API,可以访问读取/写入记录数、逻辑/物理计划等低层指标,有助于构建趋势,确保数据管道产生正确结果,而且无需修改代码就能获取现有应用程序概览。高层方法(如手动检查数据或使用数据质量库)要方便得多,但也存在性能损耗等缺点。
正如实际应用场景一样,这两种方法各有优劣,适用场景取决于应用程序的类型。请明智选择。
IBM® Databand 利用这两种方式提供一套全面的 Spark 应用程序跟踪方案。尽管我们主要使用 Spark 侦听器来构建指标趋势和数据沿袭,但我们也为 Deequ 提供便利的指标存储,并支持跟踪手动计算的各类指标。
了解更多关于 Databand 连续数据可观察性平台的信息,以及它如何助力企业更早地检测数据事件,更快地解决这些问题,并获得更可信的数据。如果您准备深入了解,请立即预约演示。
通过近乎实时的运营分析加速混合事件识别。
通过云分析解决方案,您可以轻松分析数据并建立机器学习模型,从而获得改变业务的成果。
利用 IBM 的云咨询服务探索新功能并推动业务敏捷性。