Apache Spark 监控:如何使用 Spark API 和开源库提升应用程序的数据可观察性

后现代图书馆

作者

Spark 对现代数据堆栈至关重要。因此,为 Spark 环境配置适当级别的可观测性极为重要。监控 Spark 有多种选择,包括可提供预配置 Spark 和 Spark SQL 指标仪表板的 SaaS 程序。如果这还不够呢?

典型的 Spark 应用程序设置,无论是自托管还是托管解决方案,通常都包含用于监控集群运行状况的操作仪表板。这些仪表板虽然实用,但只能提供基础设施概览,而非与数据相关的实际指标。确实,当 CPU 使用率增加或集群 RAM 不足时,我们可以推测应用程序可能出现问题,但当数据源改变了模式或来自其他部门的数据损坏时,这些也无济于事。工程师面临的大多数问题是由数据而非底层基础设施引起的,因此他们必须花费大量时间来重现问题,或像侦探一样反复检查文件和存储桶。这正是应用程序监控能发挥作用的地方。

不同场景需要不同层次的可见性,数据工程师需要具备比执行指标更深一层的洞察力。否则,您可能要耗费大量时间来调试 Spark 中的数据质量问题。

在本指南中,您将了解如何为 Spark 实现高层与低层的数据可观察性。高层可观测性需要使用 Spark 的内部系统,例如侦听器 API 和查询执行侦听器。实现低层可观测性,需要学习如何使用库来跟踪数据质量指标

掌握这两类方法后,您便可以根据具体问题选择最适合的解决方法。

辅以专家洞察分析的最新科技新闻

通过 Think 时事通讯,了解有关 AI、自动化、数据等方面最重要且最有趣的行业趋势。请参阅 IBM 隐私声明

谢谢!您已订阅。

您的订阅将以英语提供。您会在每份时事通讯中找到一个取消订阅链接。您可以在此管理您的订阅或取消订阅。更多相关信息,请参阅我们的《IBM 隐私声明》。

监控 Apache Spark 的低层方法

Spark 侦听器

这种指标获取方法非常古老但稳定可靠。实际上,Spark 用户界面也是通过同样的机制实现指标可视化。Spark 侦听器 API 允许开发人员跟踪 Spark 在应用程序执行期间发出的事件。这些事件通常是应用程序启动/结束、作业启动/结束、阶段启动/结束等。请在 SparkJavaDoc 中获取完整列表。Spark 侦听器配置简单,使用方便,可以轻松获取指标。每次执行操作后,Spark 会调用 Spark 侦听器并将一些元数据信息传递给其方法。其中包括执行时间、读取/写入的记录数、读取/写入的字节数等信息。

这种非常基础的低层数据质量监控可用于检查记录数量和大小。假设您有一个每日运行的作业,对传入的数据集执行转换/分析。您可以编写侦听器,检查从输入中读取了多少条记录,并与前一天的结果进行比较。如果差异显著,我们可以推测数据源可能存在问题。

然而此方案需要自行编写监控程序:需部署指标存储系统、配置告警机制。当应用代码变更时,所有指标键值也需同步调整并妥善处理。

但是,即使是简单的 Spark 侦听器也能提供一些洞察分析。

以下是此类 Spark 侦听器的示例:

public class SomeSparkListener extends SparkListener{

    /**
     * This very simple spark listener prints metrics collected for every stage.
     *
     * @param stageCompleted
     */
    @Override
    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
        StageInfo stageInfo = stageCompleted.stageInfo();
        Iterator it = stageInfo.taskMetrics().accumulators().iterator();

        while (it.hasNext()) {
            AccumulatorV2 next = it.next();
            String key = next.name().get();
            Object value = next.value();
            System.out.printf("key: %s, value: %s%n", key, value);
        }

您可通过以下几种方式将 Spark 侦听器添加到应用程序中:

以编程方式添加:

SparkSession spark = SparkSession.builder().getOrCreate();
spark.sparkContext().addSparkListener(new SomeSparkListener());

或通过 spark-submit/spark cluster driver 选项传递参数:

spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"

Spark 查询执行侦听器

这是另一种开箱即用的 Spark 监控机制。查询执行侦听器允许开发人员订阅查询完成事件,而不关注极低层指标。它提供有关已执行查询的更高层元数据,例如逻辑和物理计划以及执行指标。

您可以获取查询层面的读取/写入记录数等指标,但这些指标是整个查询的汇总,而不是针对特定任务/作业/阶段。

此外,还可以从计划中提取数据位置和模式等非常有用的信息。您可以提取并存储模式以及数据帧维度,并与之前的运行结果进行比较,在出现问题时触发警报。

不过,从计划中提取数据可能很复杂,因为必须使用低层 Spark API。

此外,实现指标存储和警报机制的运营负担仍然存在。Spark 提供的只是元数据。如何利用它是开发人员的责任。

以下是一个简单的查询执行侦听器示例,用于打印计划和指标:

public class ExampleQueryExecutionListener implements QueryExecutionListener {

    /**
     * Print plan and query metrics
     *
     * @param funcName
     * @param qe
     * @param durationNs
     */
    @Override
    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        System.out.println(qe.executedPlan().prettyJson());
        Iterator it = qe.executedPlan().metrics().iterator();
        while (it.hasNext()) {
            Tuple2 next = it.next();
            System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
        }
    }

    @Override
    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

可以通过编程方式或配置添加查询执行侦听器:

在应用程序代码中:SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());

通过 spark-submit:

spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"

实施低层监控可能是一项艰巨工作,但这种“系统”监控方式有一个巨大优势:不会增加计算开销。这些元数据由 Spark 内部生成并记录,因此不会对查询执行时间造成任何影响。

使用侦听器进行监控可以避免修改任何应用程序代码。当您想要跟踪现有和旧版应用程序的数据,但预算有限无法修改时,这种方式极具优势。只需编写一个侦听器,通过 Spark 配置传入,即可获知数据全貌。

AI 学院

利用混合云实现 AI 就绪

本课程由 IBM 资深思想领袖带领,旨在帮助企业领导者获得所需的知识,以便划分可以推动增长的 AI 投资的优先级。

监控 Apache Spark 的高层方法

人工数据质量检查

通过手动验证可大幅提升对输入数据的可信度。假设输入数据源预期包含某数量级记录,且该数值通常不应低于 X,可编写如下简易检查:

df = spark.read("path")
if (df.count < X) {
     throw new RuntimeException("Input data is missing")
 }

监控的可能性是无限的。我们可以比较计数、非空值计数、推断的模式等。

使使用数据质量检测库

由于许多质量检查都比较基础,例如确保数据帧的形状和内容正确,社区为这类检查开发了方便的库。其中一个库就是 Deequ。它为大多数场景提供了丰富的领域特定语言 (DSL)。值得一试。它还具有高级功能,例如能够进行列剖析、计算最小值/最大值/平均值/百分位数、计算直方图、检测异常等等。

请参考 Deequ 文档中的以下示例:

val verificationResult = VerificationSuite()
   .onData(data)
   .addCheck(
     Check(CheckLevel.Error, "数据单元测试")
       .hasSize(_ == 5) // 我们预期有 5 行
       .isComplete("id") // 不应出现 NULL
       .isUnique("id") // 不应包含重复项
       .isComplete("productName") // 不应出现 NULL
       // 应仅包含 "high" 和 "low" 这两个值
       .isContainedIn("priority", Array("high", "low"))
       .isNonNegative("numViews") // 不应包含负值
       // 至少一半的描述应包含 URL
       .containsURL("description", _ >= 0.5)
       // 一半的项目浏览量应少于 10 次
       .hasApproxQuantile("numViews", 0.5, _ <= 10))
     .run()

可见,我们将大量检查规则封装成设计良好的开箱即用 DSL。

更重要的是,Deequ 能够存储检查结果并自动与以往运行结果进行比较。这可以通过利用指标存储库来实现。用户可自行编写实现方案,将 Deequ 无缝集成到现有的监控基础设施中。

虽然高层次的应用质量检查比低阶方案更灵活,但存在显著缺陷——性能损耗。由于每次计算都会触发 Spark 操作,在处理大规模数据集时可能产生可观开销。每个"计数"与"条件查询"都可能引发全表扫描。Spark 内部会优化执行计划,但仍需考量这些影响并确保数据剖析不会损害系统性能。

总结

我们回顾了多种监控 Spark 应用程序数据质量的方法。低层方法利用 Spark 事件侦听器 API,可以访问读取/写入记录数、逻辑/物理计划等低层指标,有助于构建趋势,确保数据管道产生正确结果,而且无需修改代码就能获取现有应用程序概览。高层方法(如手动检查数据或使用数据质量库)要方便得多,但也存在性能损耗等缺点。

正如实际应用场景一样,这两种方法各有优劣,适用场景取决于应用程序的类型。请明智选择。

IBM® Databand 利用这两种方式提供一套全面的 Spark 应用程序跟踪方案。尽管我们主要使用 Spark 侦听器来构建指标趋势和数据沿袭,但我们也为 Deequ 提供便利的指标存储,并支持跟踪手动计算的各类指标。

了解更多关于 Databand 连续数据可观察性平台的信息,以及它如何助力企业更早地检测数据事件,更快地解决这些问题,并获得更可信的数据。如果您准备深入了解,请立即预约演示

相关解决方案
IBM Z Operational Log and Data Analytics 

通过近乎实时的运营分析加速混合事件识别。

探索 IBM Z
云分析解决方案

通过云分析解决方案,您可以轻松分析数据并建立机器学习模型,从而获得改变业务的成果。

深入了解云分析解决方案
云咨询服务

利用 IBM 的云咨询服务探索新功能并推动业务敏捷性。

深入了解我们的云咨询服务
采取后续步骤

通过连接大型机和云的强大分析功能,从 IBM Z 数据中解锁实时洞察分析,以便您可以更快地采取行动、降低风险并做出更明智的决策。

探索 IBM Z 获取更多信息