Spark ist ein unverzichtbarer Bestandteil der modernen Datenarchitektur. Daher ist es extrem wichtig, das richtige Maß an Observability für Ihre Spark-Umgebungen zu haben. Es gibt zahlreiche Möglichkeiten zur Überwachung von Spark, darunter SaaS-Programme, die vorkonfigurierte Dashboards für Spark- und Spark SQL-Metriken bereitstellen. Und wenn das nicht ausreicht?
Die typische Einrichtung einer Spark-Anwendung, egal ob es sich um eine selbst gehostete oder verwaltete Lösung handelt, beinhaltet einige operative Dashboards für die Cluster-Zustandsüberwachung. Diese Dashboards sind zwar sehr nützlich, liefern uns aber nur einen Überblick über die Infrastruktur und nicht die eigentlichen Metriken zu den Daten. Ja, wir können annehmen, dass mit der App etwas nicht stimmt, wenn die CPU mehr genutzt hat oder der Cluster keinen RAM mehr hat, aber es hilft nicht, wenn die Quelle das Schema geändert hat oder Daten aus einer anderen Abteilung kaputt sind. Die meisten Probleme, mit denen Techniker konfrontiert werden, werden durch Daten und nicht durch die zugrunde liegende Infrastruktur verursacht. Daher müssen sie viel Zeit damit verbringen, Probleme zu reproduzieren oder wie Detektive an Dateien und Gruppierungen herumzuspielen. Hier kann die eigentliche Anwendungsüberwachung helfen.
Jede Situation erfordert ein anderes Maß an Transparenz, und Dateningenieure müssen in der Lage sein, über die reinen Metriken hinauszugehen. Andernfalls kann die Fehlersuche bei Datenqualitätsproblemen in Spark sehr zeitaufwendig sein.
In diesem Leitfaden erfahren Sie, wie Sie hohe und niedrige Ebenen der Daten-Observability für Spark erreichen. Auf der übergeordneten Ebene werden Sie die internen Systeme von Spark wie Listener APIs und Query Execution Listeners verwenden. Auf der niedrigen Ebene lernen Sie, wie Sie Bibliotheken verwenden, um Datenqualitätsmetriken zu verfolgen.
Nachdem Sie beides gelernt haben, können Sie diejenige Methode auswählen, die für das jeweilige Problem am besten geeignet ist.
Branchen-Newsletter
Bleiben Sie mit dem Think-Newsletter über die wichtigsten – und faszinierendsten – Branchentrends in den Bereichen KI, Automatisierung, Daten und darüber hinaus auf dem Laufenden. Weitere Informationen finden Sie in der IBM Datenschutzerklärung.
Ihr Abonnement wird auf Englisch geliefert. In jedem Newsletter finden Sie einen Abmeldelink. Hier können Sie Ihre Abonnements verwalten oder sich abmelden. Weitere Informationen finden Sie in unserer IBM Datenschutzerklärung.
Das ist eine sehr alte und absolut sichere Methode, um Metriken zu erhalten. Tatsächlich verwendet die Spark-Benutzeroberfläche denselben Mechanismus, um Metriken zu visualisieren. Die Spark Listeners API ermöglicht es Entwicklern, Ereignisse zu verfolgen, die Spark während der Anwendungsausführung ausgibt. Diese Ereignisse sind in der Regel Start/Ende einer Anwendung, Start/Ende eines Auftrags, Start/Ende einer Phase usw. Die vollständige Liste finden Sie in Spark JavaDoc. Spark Listener sind einfach zu konfigurieren und einfach zu verwenden, um Metriken zu erfassen. Nach Ausführung jeder dieser Operationen ruft Spark Listener auf und übergibt einige Metadateninformationen an seine Methode. Dazu gehören Dinge wie Ausführungszeit, gelesene/geschriebene Datensätze, Bytes gelesen/geschrieben und anderes.
Diese sehr grundlegende und einfache Überwachung der Datenqualität prüft die Anzahl und Größe der Datensätze. Stellen Sie sich vor, Sie haben einen Job, der täglich ausgeführt wird und einige Transformationen/Analysen an eingehenden Datensätzen durchführt. Sie können einen Listener schreiben, der überprüft, wie viele Datensätze aus der Eingabe gelesen wurden, und dies mit dem Ergebnis des Vortages vergleicht. Wenn der Unterschied signifikant ist, können wir davon ausgehen, dass mit der Datenquelle etwas nicht stimmt.
Dieser Ansatz erfordert jedoch die Entwicklung eigener Überwachungslösungen. Metriken sollten irgendwo gespeichert werden, Alarmmechanismen sollten konfiguriert werden. Wenn sich der Anwendungscode ändert, ändern sich auch alle Metriken und man sollte das richtig handhaben.
Aber auch ein einfacher Spark Listener kann einige Erkenntnisse zu Ihren Daten liefern.
Hier ist ein Beispiel für einen solchen Spark Listener:
public class SomeSparkListener extends SparkListener {
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
Sie können Spark Listener auf verschiedene Arten zu Ihrer Anwendung hinzufügen:
Programmatisch:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
Oder übergeben Sie es über die Optionen des spark-submit/Cluster-Treibers:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
Dies ist ein weiterer Mechanismus zur Spark-Überwachung, der standardmäßig verfügbar ist. Anstatt sich auf sehr untergeordnete Metriken zu konzentrieren, ermöglicht Query Execution Listener Entwicklern, Ereignisse zum Abschluss von Abfragen zu abonnieren. Es bietet umfassendere Metadaten über ausgeführte Abfragen wie logische und physische Pläne und Metriken.
Sie können Metriken wie die Anzahl der von einer Abfrage gelesenen/geschriebenen Datensätze abrufen, diesmal jedoch aggregiert für die gesamte Abfrage anstatt für bestimmte Aufgaben/Jobs/Phasen.
Außerdem können sehr nützliche Informationen aus Plänen wie Datenstandort und Schema extrahiert werden. Sie können das Schema zusammen mit den Dataframe-Dimensionen extrahieren und speichern und es mit den vorherigen Durchläufen vergleichen. Bei Problemen werden Warnmeldungen ausgelöst.
Das Extrahieren von Daten aus einem Plan kann jedoch kompliziert sein, da Sie gezwungen sind, eine Spark-API auf niedriger Ebene zu verwenden.
Außerdem sind alle operativen Belastungen bei der Implementierung von Mechanismen zur Speicherung von Metriken und zur Benachrichtigung weiterhin vorhanden. Was Sie von Spark erhalten, sind lediglich Metadaten. Es liegt in der Verantwortung des Entwicklers, sie zu nutzen.
Hier ist ein Beispiel für einen einfachen Query Execution Listener, der den Ausführungsplan und die Metriken ausgibt:
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
Listener für die Abfrageausführung können entweder programmgesteuert oder über die Konfiguration hinzugefügt werden:
Im Anwendungscode: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
Über spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
Die Implementierung einer Überwachung auf niedriger Ebene kann eine ernsthafte Schwerarbeit sein, aber die „System“-Methode der Überwachung hat einen großen Nutzen: Sie verursacht keinen Rechenaufwand. Da die Metadaten von Spark-Interna ausgegeben und aufgezeichnet werden, gibt es keine Strafen für die Ausführungszeiten der Abfragen.
Wenn Sie Listener für die Überwachung verwenden, müssen Sie keinen Anwendungscode berühren. Das kann enorme Nutzen haben, wenn man Daten zu bestehenden und Altlast-Anwendungen verfolgen möchte, aber kein Budget für Änderungen hat. Schreiben Sie einfach einen Listener, übergeben Sie ihn über die Spark-Konfiguration und erhalten Sie ein Abbild Ihrer Daten.
Sie können Ihr Vertrauen in eingehende Daten erheblich steigern, indem Sie diese manuell überprüfen. Nehmen wir an, wir erwarten eine bestimmte Anzahl von Datensätzen in der Eingabedatenquelle und diese Anzahl sollte normalerweise nicht kleiner als X sein. Wir können etwas ganz Einfaches schreiben wie:
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
Die Möglichkeiten hier sind grenzenlos. Wir können Zählungen, Nicht-Nullwerte, abgeleitete Schemata usw. vergleichen.
Da viele Qualitätsprüfungen mehr oder weniger trivial sind, wie z. B. die Sicherstellung, dass Ihr Datenrahmen die richtige Form und den richtigen Inhalt hat, hat die Community praktische Bibliotheken für solche Prüfungen entwickelt. Eine dieser Bibliotheken ist Deequ. Sie bietet eine umfangreiche domänenspezifische Sprache (DSL) für die meisten Anwendungsfälle. Probieren Sie sie aus. Außerdem gibt es fortschrittliche Funktionen, wie die Möglichkeit, Spalten zu profilieren, Min/Max/Mittelwert/Perzentile zu berechnen, Histogramme zu berechnen, Anomalien zu erkennen und vieles mehr.
Betrachten Sie folgendes Beispiel aus den Deequ-Dokumenten:
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // Wir erwarten 5 Zeilen
.isComplete("id") sollte niemals NULL sein
.isUnique("id") // sollte keine Duplikate enthalten
.isComplete("productName") // sollte niemals NULL sein // sollte nur die Werte „hoch“ und „tief“ enthalten .isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // sollte keine negativen Werte enthalten // mindestens die Hälfte der Beschreibungen sollte eine URL enthalten .containsURL("description", _ >= 0,5)
Die Hälfte der Beiträge sollte weniger als 10 Aufrufe haben
.hasApproxQuantile("numViews", 0.5, _ <= 10)) .run()
Wie Sie sehen, haben wir eine große Anzahl von Prüfungen in eine schöne und gebrauchsfertige DSL verpackt.
Noch wichtiger ist jedoch, dass Deequ die Möglichkeit bietet, Prüfergebnisse zu speichern und automatisch Vergleiche mit früheren Durchläufen durchzuführen. Dies kann durch die Verwendung von Metrik-Repositories geschehen. Sie können Ihre eigene Implementierung schreiben und Deequ nahtlos in die bestehende Überwachungsinfrastruktur integrieren.
High-Level-Qualitätsprüfungen von Anwendungen sind zwar wesentlich flexibler als Low-Level-Ansätze, haben aber einen großen Nachteil: Leistungseinbußen. Da jede Berechnung eine Spark-Operation ausführt, kann der Overhead in manchen Fällen sehr erheblich sein, insbesondere bei großen Datensätzen. Jede „Anzahl“ und jedes „Ort“ kann zu vollständigen Scans führen. Spark wird intern sein Bestes tun, um die Ausführungspläne zu optimieren. Sie sollten jedoch diese Auswirkungen berücksichtigen und sicherstellen, dass die Datenprofilierung Ihre Leistung nicht beeinträchtigt.
Wir haben verschiedene Methoden zur Überwachung der Datenqualität für Spark-Anwendungen überprüft. Der Low-Level-Ansatz nutzt die Spark Event Listeners API und bietet Zugriff auf niedrigstufige Metriken wie gelesene/geschriebene Datensätze, logische/physische Pläne und kann nützlich sein, um Trends zu entwickeln und sicherzustellen, dass die Datenpipeline korrekte Ergebnisse liefert, sowie um einen Überblick über bestehende Anwendungen ohne Codeänderungen zu erhalten. Hochstufige Ansätze wie die manuelle Überprüfung von Daten oder die Verwendung von Datenqualitätsbibliotheken sind viel bequemer, haben aber Nachteile wie Leistungseinbußen.
Wie in jeder realen Situation gibt es auch hier immer Vor- und Nachteile sowie bessere Szenarien für beide Ansätze, abhängig von der Art Ihrer Anwendung. Nutzen Sie es mit Bedacht.
Bei IBM Databand nutzen wir beide Möglichkeiten, um umfassende Optionen zur Verfolgung von Spark-Anwendungen bereitzustellen. In unserem Kern verwenden wir Spark Listeners, um Kennzahlentrends und Datenabstammung zu erstellen, aber wir bieten auch einen praktischen Metriken Store für Deequ sowie die Möglichkeit, einzelne manuell berechnete Metriken zu verfolgen.
Erfahren Sie mehr über die Plattform für kontinuierliche Daten-Observability von Databand und wie sie hilft, Datenvorfälle früher zu erkennen, schneller zu lösen und dem Unternehmen vertrauenswürdigere Daten zu liefern. Wenn Sie bereit sind, einen genaueren Blick zu werfen, buchen Sie noch heute eine Demo.
Beschleunigen Sie die Identifizierung hybrider Vorfälle mit Betriebsanalysen nahezu in Echtzeit.
Schalten Sie geschäftsverändernde Ergebnisse mit Cloud-Analyse-Lösungen frei, die es Ihnen ermöglichen, Daten einfach zu analysieren und Modelle für maschinelles Lernen zu erstellen.
Entdecken Sie neue Funktionen und steigern Sie die geschäftliche Agilität mit IBM Cloud Consulting Services.