Monitoraggio di Apache Spark: come utilizzare le API Spark e le librerie open source per ottenere una migliore osservabilità dei dati della tua applicazione

Biblioteca postmoderna

Autore

Spark è fondamentale per lo stack di dati moderno. Per questo motivo, è estremamente importante avere il giusto livello di osservabilità per i tuoi ambienti Spark. Esistono numerose opzioni per monitorare Spark, tra cui programmi SaaS che forniscono dashboard preconfigurate per le metriche Spark e Spark SQL. E se non fosse sufficiente?

La configurazione tipica di un'applicazione Spark, sia essa una soluzione self-hosted o gestita, include alcune dashboard operative per il monitoraggio della salute dei cluster. Ma anche se queste dashboard sono molto utili, ci forniscono solo una panoramica dell'infrastruttura e non le metriche effettive relative ai dati. Sì, possiamo presumere che ci sia qualcosa che non va nell'app quando la CPU ha aumentato l'utilizzo o il cluster sta esaurendo la RAM, ma non aiuta quando la fonte ha cambiato lo schema o i dati provenienti da un altro reparto sono corrotti. La maggior parte dei problemi che gli ingegneri devono affrontare sono causati dai dati e non dall'infrastruttura sottostante, per cui devono passare molto tempo a riprodurre i problemi o ad armeggiare con i file e i bucket come dei detective. È qui che il monitoraggio effettivo delle applicazioni può essere d'aiuto.

Ogni situazione richiede un diverso livello di visibilità, e i data engineer devono avere la capacità di andare oltre le metriche di esecuzione. Altrimenti, potresti dedicare molto tempo a debuggare problemi di qualità dei dati in Spark.

In questa guida imparerai come ottenere osservabilità dei dati di alto o basso livello per Spark. Per il livello alto, utilizzerai i sistemi interni di Spark, come le API Listener e i Listener di esecuzione query. Per il livello basso, imparerai a usare le librerie per monitorare le metriche di qualità dei dati.

Dopo aver imparato a fare entrambe le cose, avrai la possibilità di scegliere la soluzione più adatta al problema che stai cercando di risolvere.

Le ultime notizie nel campo della tecnologia, supportate dalle analisi degli esperti

Resta al passo con le tendenze più importanti e interessanti del settore relative ad AI, automazione, dati e altro con la newsletter Think. Leggi l'Informativa sulla privacy IBM.

Grazie per aver effettuato l'iscrizione!

L'abbonamento sarà fornito in lingua inglese. Troverai un link per annullare l'iscrizione in tutte le newsletter. Puoi gestire i tuoi abbonamenti o annullarli qui. Per ulteriori informazioni, consulta l'Informativa sulla privacy IBM.

Metodi di monitoraggio di Apache Spark di basso livello

Spark Listener

Si tratta di un metodo molto vecchio e infallibile per ottenere metriche. In realtà, l'interfaccia utente di Spark utilizza lo stesso meccanismo per visualizzare le metriche. L'API Spark Listener consente agli sviluppatori di tenere traccia degli eventi emessi da Spark durante l'esecuzione dell'applicazione. Questi eventi sono tipicamente inizio/fine dell'applicazione, inizio/fine del lavoro, inizio/fine della fase, ecc. Puoi trovare l'elenco completo su Spark JavaDoc. Gli Spark Listener sono facili da configurare e da utilizzare per acquisire metriche. Dopo aver eseguito ciascuna delle operazioni, Spark chiamerà Spark Listener e passerà alcune informazioni sui metadati al suo metodo. Ciò includerà elementi come il tempo di esecuzione, i record letti/scritti, i byte letti/scritti e altro.

Questo monitoraggio basilare e di basso livello della qualità dei dati controllerà il numero e la dimensione dei record. Immagina di avere un lavoro che viene eseguito quotidianamente e che esegue alcune trasformazioni/analisi sui set di dati in arrivo. Puoi scrivere un listener che controlla quanti record sono stati letti dall'input e confrontarlo con il risultato del giorno precedente. Quando la differenza è significativa, possiamo supporre che qualcosa non vada bene con la fonte di dati.

Tuttavia, questo approccio richiede la scrittura di soluzioni di monitoraggio interne. I valori delle metriche devono essere memorizzati da qualche parte, i meccanismi di allarme devono essere configurati. Quando il codice dell'applicazione cambia, cambiano anche tutte le chiavi delle metriche e serve una gestione corretta.

Tuttavia, anche un semplice Spark Listener può fornire informazioni sui tuoi dati.

Ecco un esempio di Spark Listener:

public class SomeSparkListener extends SparkListener {

    /**
     * This very simple spark listener prints metrics collected for every stage.
     *
     * @param stageCompleted
     */
    @Override
    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
        StageInfo stageInfo = stageCompleted.stageInfo();
        Iterator it = stageInfo.taskMetrics().accumulators().iterator();

        while (it.hasNext()) {
            AccumulatorV2 next = it.next();
            String key = next.name().get();
            Object value = next.value();
            System.out.printf("key: %s, value: %s%n", key, value);
        }
    }
}

Puoi aggiungere Spark Listener alla tua applicazione in diversi modi:

Aggiungilo a livello di programmazione:

SparkSession spark = SparkSession.builder().getOrCreate();
spark.sparkContext().addSparkListener(new SomeSparkListener());

Oppure passalo tramite le opzioni del driver spark-submit/cluster:

spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"

Spark Query Execution Listener

Si tratta di un altro meccanismo di monitoraggio di Spark fornito completo. Invece di concentrarsi su metriche di livello molto basso, Query Execution Listener permette agli sviluppatori di iscriversi agli eventi di completamento delle query. Fornisce metadati di alto livello sulle query eseguite, come piani logici e fisici, e metriche di esecuzione.

Puoi ottenere metriche come record letti/scritti tramite query, ma questa volta aggregati per l'intera query invece che per compiti/lavori/fasi specifici.

Inoltre, informazioni molto utili possono essere estratte dai piani, come la localizzazione dei dati e lo schema. È possibile estrarre e memorizzare lo schema insieme alle dimensioni del dataframe e confrontarlo con le esecuzioni precedenti, attivando avvisi quando qualcosa non va.

Tuttavia, estrarre dati da un piano può essere complicato perché si è costretti a usare un'API Spark di basso livello.

Inoltre, tutti gli oneri operativi legati all'implementazione dei meccanismi di storage e allerta delle metriche sono ancora presenti. Ciò che otterrai da Spark sono solo metadati. È responsabilità dello sviluppatore utilizzarli.

Ecco un esempio di un semplice Query Execution Listener che stampa il piano e le metriche:

public class ExampleQueryExecutionListener implements QueryExecutionListener {

    /**
     * Print plan and query metrics
     *
     * @param funcName
     * @param qe
     * @param durationNs
     */
    @Override
    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        System.out.println(qe.executedPlan().prettyJson());
        Iterator it = qe.executedPlan().metrics().iterator();
        while (it.hasNext()) {
            Tuple2 next = it.next();
            System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
        }
    }

    @Override
    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

I listener di esecuzione delle query possono essere aggiunti a livello di codice o tramite configurazione:

Nel codice dell'applicazione: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());

Tramite spark-submit:

spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"

Implementare il monitoraggio di basso livello può essere un lavoro davvero impegnativo, tuttavia, il modo "sistematico" di monitorare ha un enorme beneficio: non introduce un sovraccarico computazionale. Poiché i metadati vengono emessi e registrati dagli interni di Spark, non ci sono penalità sui tempi di esecuzione delle query.

Utilizzare Listener per il monitoraggio ti permette di evitare di toccare qualsiasi codice di applicazione. Questo può avere enormi benefici quando desideri monitorare i dati su applicazioni esistenti e legacy, ma non disponi del budget per apportare modifiche. Basta scrivere un listener, passarlo tramite la configurazione Spark e ottenere un quadro dei dati.

AI Academy

Prepararsi all'AI con l'hybrid cloud

Condotto dai migliori leader di pensiero di IBM, il programma di studi è stato progettato per aiutare i dirigenti aziendali ad acquisire le conoscenze necessarie per dare priorità agli investimenti in AI che possono favorire la crescita.

Metodi di monitoraggio di Apache Spark di alto livello

Controlli manuali sulla qualità dei dati

Puoi aumentare notevolmente la fiducia nei dati in arrivo validandoli manualmente. Supponiamo di aspettarci un certo numero di record nella sorgente di input e questo numero di solito non dovrebbe essere inferiore a X. Possiamo scrivere un codice molto semplice come:

df = spark.read("path")
     if (df.count < X) {
     throw new RuntimeException("Input data is missing")
 }

Le possibilità qui sono illimitate. Possiamo confrontare i conteggi, il conteggio dei valori non nulli, gli schemi dedotti, ecc.

Utilizzo delle librerie di qualità dei dati

Poiché molti controlli di qualità sono più o meno banali, come ad esempio verificare che il dataframe abbia la forma e i contenuti corretti, la comunità ha sviluppato librerie utili per tali controlli. Una di queste librerie è Deequ. Fornisce un ricco linguaggio specifico di dominio (DSL) per la maggior parte dei casi. Dai un'occhiata. Inoltre ha funzionalità avanzate, come la possibilità di profilare colonne, calcolare min/max/media/percentuali, calcolare istogrammi, rilevare anomalie e molto altro.

Prendiamo in considerazione il seguente esempio tratto dalla documentazione di Deequ:

val verificationResult = VerificationSuite()
   .onData(data)
   .addCheck(
     Check(CheckLevel.Error, "unit testing my data")
       .hasSize(_ == 5) // ci aspettiamo 5 file
       .isComplete("id") non deve mai essere NULL
       .isUnique("id") // non deve contenere duplicati
       .isComplete("productName") // non deve mai essere NULL
       // deve contenere solo valori "high" e "low"
       .isContainedIn("priority", Array("high", "low"))
       .isNonNegative("numViews") // non deve contenere valori negativi
 // almeno la metà delle descrizioni deve contenere un url
.containsURL("description", _ >= 0.5)
       // la metà degli elementi deve avere meno di 10 visualizzazioni
       .hasApproxQuantile("numViews", 0.5, _ <= 10))
     .run()

Abbiamo un'enorme serie di controlli avvolti in un DSL bello e pronto all'uso.

Ancora più importante, Deequ offre la possibilità di memorizzare i risultati dei controlli e di eseguire automaticamente i confronti con le esecuzioni precedenti. Questo può essere fatto utilizzando i repository di metriche. Si può scrivere la propria implementazione e integrare senza problemi Deequ nell'infrastruttura di monitoraggio esistente.

Sebbene i controlli di qualità delle applicazioni di alto livello siano molto più flessibili rispetto agli approcci di basso livello, presentano un grande svantaggio: penalizzano le prestazioni. Poiché ogni calcolo emette un'operazione di spark, l'overhead può essere molto significativo in alcuni casi, soprattutto sui set di dati di grandi dimensioni. Ogni "conteggio" e "dove" può portare a scansioni complete. Spark farà del suo meglio internamente per ottimizzare i piani di esecuzione, ma bisogna considerare queste implicazioni e assicurarsi che la profilazione dei dati non danneggi le prestazioni.

Conclusione

Abbiamo verificato diversi metodi di monitoraggio della qualità dei dati per le applicazioni Spark. L'approccio a basso livello utilizza l'API Spark Event Listener e dà accesso a metriche di basso livello come record letti/scritti, piani logici/fisici e può essere utile per costruire tendenze, assicurarsi che la pipeline dei dati produca risultati adeguati e una panoramica delle applicazioni esistenti senza modifiche al codice. Gli approcci di alto livello, come la verifica manuale dei dati o l'utilizzo di librerie di qualità dei dati, sono molto più convenienti, ma presentano degli svantaggi, come la penalizzazione delle prestazioni.

Come in ogni situazione del mondo reale, ci sono sempre compromessi e scenari migliori per entrambi gli approcci, a seconda del tipo di applicazione. Usalo con saggezza.

In IBM® Databand, utilizziamo entrambe le opzioni per offrire un insieme completo di opzioni per monitorare le applicazioni Spark. Sebbene utilizziamo Spark Listener per creare trend di metriche e data lineage, forniamo anche un pratico Metrics Store per Deequ, nonché la possibilità di monitorare singole metriche calcolate manualmente.

Scopri di più sulla piattaforma di osservabilità continua dei dati di Databand e su come aiuta a rilevare gli incidenti di dati in anticipo, risolverli più rapidamente e fornire dati più affidabili all'azienda. Se desideri approfondire ulteriormente l'argomento, prenota subito una demo.

Soluzioni correlate
IBM Z Operational Log and Data Analytics 

Accelera l'identificazione ibrida degli incidenti con analytics operative quasi in tempo reale.

Esplora IBM Z
Soluzioni di cloud analytics

Sblocca i risultati che cambiano il business con le soluzioni di cloud analytics che ti consentono di analizzare facilmente i dati e di creare modelli di machine learning.

Esplora le soluzioni di cloud analytics
Servizi di consulenza cloud

Scopri nuove funzionalità e promuovi l'agilità aziendale con i servizi di consulenza cloud di IBM.

Esplora i servizi di consulenza cloud
Prossimi passi

Sblocca insight in tempo reale dai tuoi dati IBM Z con analytics che collegano mainframe e cloud, in modo da poter agire più velocemente, ridurre i rischi e prendere decisioni più intelligenti.

Esplora IBM Z Ottieni maggiori informazioni