Si tratta di un metodo molto vecchio e infallibile per ottenere metriche. In realtà, l'interfaccia utente di Spark utilizza lo stesso meccanismo per visualizzare le metriche. L'API Spark Listener consente agli sviluppatori di tenere traccia degli eventi emessi da Spark durante l'esecuzione dell'applicazione. Questi eventi sono tipicamente inizio/fine dell'applicazione, inizio/fine del lavoro, inizio/fine della fase, ecc. Puoi trovare l'elenco completo su Spark JavaDoc. Gli Spark Listener sono facili da configurare e da utilizzare per acquisire metriche. Dopo aver eseguito ciascuna delle operazioni, Spark chiamerà Spark Listener e passerà alcune informazioni sui metadati al suo metodo. Ciò includerà elementi come il tempo di esecuzione, i record letti/scritti, i byte letti/scritti e altro.

Questo monitoraggio basilare e di basso livello della qualità dei dati controllerà il numero e la dimensione dei record. Immagina di avere un lavoro che viene eseguito quotidianamente e che esegue alcune trasformazioni/analisi sui set di dati in arrivo. Puoi scrivere un listener che controlla quanti record sono stati letti dall'input e confrontarlo con il risultato del giorno precedente. Quando la differenza è significativa, possiamo supporre che qualcosa non vada bene con la fonte di dati.

Tuttavia, questo approccio richiede la scrittura di soluzioni di monitoraggio interne. I valori delle metriche devono essere memorizzati da qualche parte, i meccanismi di allarme devono essere configurati. Quando il codice dell'applicazione cambia, cambiano anche tutte le chiavi delle metriche e serve una gestione corretta.

Tuttavia, anche un semplice Spark Listener può fornire informazioni sui tuoi dati.

Ecco un esempio di Spark Listener:

public class SomeSparkListener extends SparkListener { /** * This very simple spark listener prints metrics collected for every stage. * * @param stageCompleted */ @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { StageInfo stageInfo = stageCompleted.stageInfo(); Iterator it = stageInfo.taskMetrics().accumulators().iterator(); while (it.hasNext()) { AccumulatorV2 next = it.next(); String key = next.name().get(); Object value = next.value(); System.out.printf("key: %s, value: %s%n", key, value); } } }

Puoi aggiungere Spark Listener alla tua applicazione in diversi modi:

Aggiungilo a livello di programmazione:

SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());

Oppure passalo tramite le opzioni del driver spark-submit/cluster:

spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"