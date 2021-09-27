Spark 对现代数据堆栈至关重要。因此，为 Spark 环境配置适当级别的可观测性极为重要。监控 Spark 有多种选择，包括可提供预配置 Spark 和 Spark SQL 指标仪表板的 SaaS 程序。如果这还不够呢？
典型的 Spark 应用程序设置，无论是自托管还是托管解决方案，通常都包含用于监控集群运行状况的操作仪表板。这些仪表板虽然实用，但只能提供基础设施概览，而非与数据相关的实际指标。确实，当 CPU 使用率增加或集群 RAM 不足时，我们可以推测应用程序可能出现问题，但当数据源改变了模式或来自其他部门的数据损坏时，这些也无济于事。工程师面临的大多数问题是由数据而非底层基础设施引起的，因此他们必须花费大量时间来重现问题，或像侦探一样反复检查文件和存储桶。这正是应用程序监控能发挥作用的地方。
不同场景需要不同层次的可见性，数据工程师需要具备比执行指标更深一层的洞察力。否则，您可能要耗费大量时间来调试 Spark 中的数据质量问题。
在本指南中，您将了解如何为 Spark 实现高层与低层的数据可观察性。高层可观测性需要使用 Spark 的内部系统，例如侦听器 API 和查询执行侦听器。实现低层可观测性，需要学习如何使用库来跟踪数据质量指标。
掌握这两类方法后，您便可以根据具体问题选择最适合的解决方法。
这种指标获取方法非常古老但稳定可靠。实际上，Spark 用户界面也是通过同样的机制实现指标可视化。Spark 侦听器 API 允许开发人员跟踪 Spark 在应用程序执行期间发出的事件。这些事件通常是应用程序启动/结束、作业启动/结束、阶段启动/结束等。请在 SparkJavaDoc 中获取完整列表。Spark 侦听器配置简单，使用方便，可以轻松获取指标。每次执行操作后，Spark 会调用 Spark 侦听器并将一些元数据信息传递给其方法。其中包括执行时间、读取/写入的记录数、读取/写入的字节数等信息。
这种非常基础的低层数据质量监控可用于检查记录数量和大小。假设您有一个每日运行的作业，对传入的数据集执行转换/分析。您可以编写侦听器，检查从输入中读取了多少条记录，并与前一天的结果进行比较。如果差异显著，我们可以推测数据源可能存在问题。
然而此方案需要自行编写监控程序：需部署指标存储系统、配置告警机制。当应用代码变更时，所有指标键值也需同步调整并妥善处理。
但是，即使是简单的 Spark 侦听器也能提供一些洞察分析。
以下是此类 Spark 侦听器的示例：
public class SomeSparkListener extends SparkListener{ /** * This very simple spark listener prints metrics collected for every stage. * * @param stageCompleted */ @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { StageInfo stageInfo = stageCompleted.stageInfo(); Iterator it = stageInfo.taskMetrics().accumulators().iterator(); while (it.hasNext()) { AccumulatorV2 next = it.next(); String key = next.name().get(); Object value = next.value(); System.out.printf("key: %s, value: %s%n", key, value); }
您可通过以下几种方式将 Spark 侦听器添加到应用程序中：
以编程方式添加：
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
或通过 spark-submit/spark cluster driver 选项传递参数：
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
这是另一种开箱即用的 Spark 监控机制。查询执行侦听器允许开发人员订阅查询完成事件，而不关注极低层指标。它提供有关已执行查询的更高层元数据，例如逻辑和物理计划以及执行指标。
您可以获取查询层面的读取/写入记录数等指标，但这些指标是整个查询的汇总，而不是针对特定任务/作业/阶段。
此外，还可以从计划中提取数据位置和模式等非常有用的信息。您可以提取并存储模式以及数据帧维度，并与之前的运行结果进行比较，在出现问题时触发警报。
不过，从计划中提取数据可能很复杂，因为必须使用低层 Spark API。
此外，实现指标存储和警报机制的运营负担仍然存在。Spark 提供的只是元数据。如何利用它是开发人员的责任。
以下是一个简单的查询执行侦听器示例，用于打印计划和指标：
public class ExampleQueryExecutionListener implements QueryExecutionListener { /** * Print plan and query metrics * * @param funcName * @param qe * @param durationNs */ @Override public void onSuccess(String funcName, QueryExecution qe, long durationNs) { System.out.println(qe.executedPlan().prettyJson()); Iterator it = qe.executedPlan().metrics().iterator(); while (it.hasNext()) { Tuple2 next = it.next(); System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value()); } } @Override public void onFailure(String funcName, QueryExecution qe, Exception exception) { } }
可以通过编程方式或配置添加查询执行侦听器：
在应用程序代码中：SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
通过 spark-submit：
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
实施低层监控可能是一项艰巨工作，但这种“系统”监控方式有一个巨大优势：不会增加计算开销。这些元数据由 Spark 内部生成并记录，因此不会对查询执行时间造成任何影响。
使用侦听器进行监控可以避免修改任何应用程序代码。当您想要跟踪现有和旧版应用程序的数据，但预算有限无法修改时，这种方式极具优势。只需编写一个侦听器，通过 Spark 配置传入，即可获知数据全貌。
通过手动验证可大幅提升对输入数据的可信度。假设输入数据源预期包含某数量级记录，且该数值通常不应低于 X，可编写如下简易检查：
df = spark.read("path") if (df.count < X) { throw new RuntimeException("Input data is missing") }
监控的可能性是无限的。我们可以比较计数、非空值计数、推断的模式等。
由于许多质量检查都比较基础，例如确保数据帧的形状和内容正确，社区为这类检查开发了方便的库。其中一个库就是 Deequ。它为大多数场景提供了丰富的领域特定语言 (DSL)。值得一试。它还具有高级功能，例如能够进行列剖析、计算最小值/最大值/平均值/百分位数、计算直方图、检测异常等等。
请参考 Deequ 文档中的以下示例：
val verificationResult = VerificationSuite() .onData(data) .addCheck( Check(CheckLevel.Error, "数据单元测试") .hasSize(_ == 5) // 我们预期有 5 行 .isComplete("id") // 不应出现 NULL .isUnique("id") // 不应包含重复项 .isComplete("productName") // 不应出现 NULL // 应仅包含 "high" 和 "low" 这两个值 .isContainedIn("priority", Array("high", "low")) .isNonNegative("numViews") // 不应包含负值 // 至少一半的描述应包含 URL .containsURL("description", _ >= 0.5) // 一半的项目浏览量应少于 10 次 .hasApproxQuantile("numViews", 0.5, _ <= 10)) .run()
可见，我们将大量检查规则封装成设计良好的开箱即用 DSL。
更重要的是，Deequ 能够存储检查结果并自动与以往运行结果进行比较。这可以通过利用指标存储库来实现。用户可自行编写实现方案，将 Deequ 无缝集成到现有的监控基础设施中。
虽然高层次的应用质量检查比低阶方案更灵活，但存在显著缺陷——性能损耗。由于每次计算都会触发 Spark 操作，在处理大规模数据集时可能产生可观开销。每个"计数"与"条件查询"都可能引发全表扫描。Spark 内部会优化执行计划，但仍需考量这些影响并确保数据剖析不会损害系统性能。
我们回顾了多种监控 Spark 应用程序数据质量的方法。低层方法利用 Spark 事件侦听器 API，可以访问读取/写入记录数、逻辑/物理计划等低层指标，有助于构建趋势，确保数据管道产生正确结果，而且无需修改代码就能获取现有应用程序概览。高层方法（如手动检查数据或使用数据质量库）要方便得多，但也存在性能损耗等缺点。
正如实际应用场景一样，这两种方法各有优劣，适用场景取决于应用程序的类型。请明智选择。
