Sparkは現代のデータスタックにとって重要です。そのため、Spark環境に適切なレベルのオブザーバビリティーを確保することが非常に重要です。SparkおよびSpark SQLメトリクス用の事前構成されたダッシュボードを提供するSaaSプログラムなど、Sparkをモニタリングするためのオプションは数多くあります。しかし、それで十分でしょうか?
一般的なSparkアプリケーションのセットアップには、セルフホスト型ソリューションであるかマネージド ソリューションであるかに関係なく、クラスターの正常性を監視するための運用ダッシュボードが含まれています。これらのダッシュボードは非常に便利ですが、インフラストラクチャーの概要のみを提供し、データに関連する実際のメトリクスは提供しません。もちろん、CPUの使用率が上がったり、クラスターのRAMが足りなくなったりすると、アプリに何か問題があるのだろうと推測することはできます。しかし、ソースがスキーマを変更したり、別の部署から来たデータ壊れている場合は、その対策は役に立ちません。エンジニアが直面する問題のほとんどは、基盤となるインフラストラクチャではなくデータによって引き起こされているため、問題の再現や検出などのファイルやバケットのやり取りに多くの時間を費やす必要があります。ここで、実際のアプリケーション監視が役立ちます。
状況ごとに異なるレベルの可視性が必要であり、データエンジニアはメトリクスを超えてさらに深く掘り下げる能力が必要です。そうしなければ、Sparkでデータ品質の問題をデバッグするのにかなりの時間を費やすことになります。
このガイドでは、Sparkで高レベルと低レベルのデータ・オブザーバビリティーを実現する方法について説明します。高レベルでは、リスナーAPIやクエリ実行リスナーなどのSparkの内部システムを使用します。低レベルでは、ライブラリーを使用してデータ品質メトリクスを追跡する方法について学びます。
両方の方法を学んだ後は、解決しようとしている問題に最適な方を選択できます。
IBMニュースレター
AI活用のグローバル・トレンドや日本の市場動向を踏まえたDX、生成AIの最新情報を毎月お届けします。登録の際はIBMプライバシー・ステートメントをご覧ください。
ニュースレターは日本語で配信されます。すべてのニュースレターに登録解除リンクがあります。サブスクリプションの管理や解除はこちらから。詳しくはIBMプライバシー・ステートメントをご覧ください。
これはメトリクスを入手するための非常に旧式の完全な方法です。実は、SparkUIはメトリクスを可視化するために、まったく同じメカニズムを利用しています。SparkリスナーAPIを使用すると、開発者はアプリケーション実行中にSparkが出力するイベントを追跡できます。これらのイベントは通常、アプリケーションの開始/終了、ジョブの開始/終了、ステージの開始/終了などです。完全なリストはSpark JavaDocでご覧いただけます。メトリクスを取得するためのSpark Listenerの設定と使用は簡単です。各オペレーションを実行した後、SparkはSpark Listenerを呼び出し、いくつかのメタデータ情報をそのメソッドに渡します。これには、実行時間、読み取り/書き込みレコード、読み取り/書き込みバイト数などが含まれます。
この非常に基本的で低レベルのデータ品質モニタリングでは、レコードの数とサイズをチェックします。毎日実行され、受信するデータセットに対して何らかのトランスフォーメーション/分析を実行するジョブがあるとします。インプットから読み込まれたレコードの数をチェックし、前の日の成果と比較するリスナーを作成します。差が大きい場合は、データソースに何か問題があると想定できます。
ただし、このアプローチでは社内の監視ソリューションを作成する必要があります。メトリクス値はどこかに保管する必要があり、アラート・メカニズムを構成する必要があります。アプリケーション・コードが変更されると、すべてのメトリクス・キーも変更されるため、適切に処理する必要があります。
しかし、単純なSpark Listenerでも、データに関するある程度の洞察を得ることができます。
以下は、そのようなSpark Listenerの例です。
パブリッククラスの SomeSparkListener は SparkListenerを拡張します{
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
}
}
Spark Listenerをアプリケーションに追加する方法は複数あります。
プログラムで追加する:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
または、spark-submit/spark のクラスタードライバーオプションで渡す:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
これは、すぐに利用できるSparkモニタリングの別のメカニズムです。Query Execution Listenerを使用すると、開発者は非常に低レベルのメトリクスに焦点を当てる代わりに、クエリ完了イベントにサブスクライブすることができます。論理プランや物理プラン、実行メトリクスなど、実行されたクエリに関する高レベルのメタデータを提供します。
クエリによって読み取られたり書き込まれたりしたレコードなどのメトリクスを取得できますが、今回は特定のタスク/ジョブ/ステージではなく、クエリ全体について集計されます。
また、データの場所やスキーマなどの計画から非常に有用な情報を抽出することもできます。データフレームの次元とともにスキーマを抽出して保管し、以前の実行と比較することで、問題が発生した場合にアラートを発することができます。
ただし、低レベルのSpark APIを使用する必要があるため、プランからのデータ抽出は複雑になる可能性があります。
また、メトリクスの保存やアラート・メカニズムの実装に伴うすべての運用上の負担は引き続き存在します。Sparkから得られるのは単なるメタデータですそれを利用するのは開発者の責任です。
以下は、プランとメトリクスを出力する単純なクエリ実行リスナーの例です。
パブリック クラス ExampleQueryExecutionListener は QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}を実装します。
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
クエリ実行リスナーは、プログラムによって、または構成を通じて追加できます。
アプリケーション・コード:SparkSessionpark = SparkSession.builder().getOrCreate();
spark.listenerManager().register(newExampleQueryExecutionListener());
spark-submit経由:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
低レベルの監視を実装するのは深刻な作業になる可能性がありますが、「システム」監視による監視には、計算上のオーバーヘッドが発生しないという大きなメリットがあります。メタデータはSpark内部によって出力および記録されるため、クエリの実行時間に対するペナルティは与えられません。
監視にリスナーを使用すると、アプリケーション・コードに触れることを回避できます。これは、既存のアプリケーションやレガシー・アプリケーションに関するデータを追跡したいものの、変更を加えるための予算がない場合に大きなメリットをもたらします。リスナーを書き、それをSpark構成経由で渡すだけで、データの全体像を把握できます。
受信データを手動で検証することで、信頼性を大幅に高めることができます。例えば、インプットデータソースにある程度のレコード数があると予想し、その数は通常X未満にならないとします。次のような極めてシンプルなものを作成できます。
df =spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
ここでの可能性は無限です。カウント、非NULL値の数、推論スキーマなどを比較できます。
データフレームが適切な形状と内容を備えていることを確認するなど、多くの品質チェックは多かれ少なかれ単純な作業であるため、コミュニティはそのようなチェックに便利なライブラリを開発しました。それらのライブラリの1つがDeequです。このライブラリは、ほとんどの場合に豊富なドメイン固有言語 (DSL) を提供します。詳細をご覧ください。また、列のプロファイリング機能、最小/最大/平均/パーセンタイルの計算、ヒストグラムの計算、異常の検知など、高度な機能を備えています。
Deequドキュメントの次の例について考えてみましょう。
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // we expect 5 rows
.isComplete("id") // should never be NULL
.isUnique("id") // should not contain duplicates
.isComplete("productName") // should never be NULL
// should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // should not contain negative values
// at least half of the descriptions should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 views
0.5, _ <= 10))
.run()
すぐに使用できる優れたDSLで保護された膨大な量のチェック・セットが保護されていることがわかります。
さらに重要なのは、Deequにはチェックの結果を保管し、以前の実行との比較を自動的に実行する機能があることです。これは、メトリクス・リポジトリーを利用することで実現できます。独自の実装を作成して、既存の監視インフラストラクチャーにDeequをシームレスに統合できます。
高レベルのアプリケーション品質チェックは、低レベルのアプローチよりもはるかに柔軟ですが、性能の低下という大きな欠点があります。すべての計算はスパーク・オペレーションを排出するため、特に大規模なデータ・セットの場合、オーバーヘッドは非常に大きくなる可能性があります。それぞれの「count」と「where」は完全なスキャンにつながる可能性があります。Sparkは内部で実行計画の最適化に最善を尽くしますが、これらの影響を考慮し、データプロファイリングが性能に悪影響を及ぼさないか確認する必要があります。
Sparkアプリケーションのデータ品質を監視するいくつかの方法をレビューしました。低レベルのアプローチでは、Spark Event Listener APIを利用して、読み取り/書き込みレコード、論理/物理プランなどの低レベルのメトリクスにアクセスできます。これは、トレンドを構築し、データパイプラインで適切な成果が得られることを確認したり、コードを変更せずに既存のアプリケーションの概要を把握したりするのに役立ちます。手作業でのデータのチェックやデータ品質ライブラリの使用などの高レベルのアプローチははるかに便利ですが、性能低下などの欠点もあります。
現実世界のあらゆる状況と同様、アプリケーションの種類に応じて、両方のアプローチにはトレードオフとより良いシナリオが常に存在します。賢く使いましょう。
IBM® Databand®では、Sparkアプリケーションを追跡するための包括的なオプション・セットを提供するために、両方の方法を利用しています。当社の中核では、Spark Listenerを使用してメトリクスの傾向とデータ・リネージュを構築していますが、Deequに便利なメトリクス保管機能(Metrics Store)や、手動で計算された個々のメトリクスを追跡する機能も提供しています。
Databandの継続的なデータ・オブザーバビリティー・プラットフォームと、そのプラットフォームを使ってデータインシデントを早期に検知・迅速に解決し、ビジネスに必要な信頼性の高いデータを確保する方法について説明します。さらに詳しく知りたい方は、今すぐデモを予約してください。
リアルタイムの運用分析でハイブリッド・インシデントの特定を加速します。
データの分析や機械学習モデルの構築を簡単に行えるクラウド分析ソリューションで、ビジネスを変革しましょう。
IBMのクラウド・コンサルティング・サービスの新しい機能を確認し、ビジネスの俊敏性を高めましょう。