Surveillance Apache Spark : comment utiliser l’API Spark et les bibliothèques open source pour améliorer l’observabilité des données de votre application

Bibliothèque postmoderne

Auteur

Spark est essentiel à la pile de données moderne. Il est donc extrêmement important d’avoir le bon niveau d’observabilité dans vos environnements Spark. Il existe de nombreuses options pour surveiller Spark, notamment des programmes SaaS qui fournissent des tableaux de bord préconfigurés pour les indicateurs Spark et Spark SQL. Et si cela ne suffisait pas ?

Une configuration typique de l’application Spark, qu’il s’agisse d’une solution auto-hébergée ou gérée, inclut des tableaux de bord pour surveiller l’état des clusters. Bien que ces tableaux de bord soient très utiles, ils fournissent une vue d’ensemble de l’infrastructure, et non les indicateurs réels liés aux données. Oui, on peut supposer qu’il y a un problème avec l’application lorsque le CPU a augmenté son utilisation ou que le cluster manque de RAM, mais cela ne sert à rien si la source a modifié le schéma ou que les données provenant d’un autre service sont cassées. La plupart des problèmes rencontrés par les ingénieurs sont causés par les données, et non par l’infrastructure sous-jacente. Ils doivent donc passer beaucoup de temps à reproduire les problèmes ou à bricoler autour des fichiers et des compartiments comme des détectives. C’est là qu’une véritable surveillance des applications peut vous être utile.

Chaque situation exige un niveau de visibilité différent, et les ingénieurs de données doivent être en mesure d’aller au-delà des indicateurs d’exécution. Sinon, vous risquez de passer beaucoup de temps à déboguer les problèmes de qualité des données dans Spark.

Dans ce guide, vous découvrirez comment atteindre des niveaux élevés et faibles d’observabilité des données pour Spark. Pour le haut niveau, vous utiliserez les systèmes internes de Spark, tels que les API Listener et les Query Execution Listeners. Pour le niveau bas, vous apprendrez à utiliser des bibliothèques pour suivre les indicateurs de qualité des données.

Après avoir appris à faire les deux, vous pourrez choisir celle qui convient le mieux au problème que vous tentez de résoudre.

Les dernières actualités technologiques, étayées par des avis d’expert

Restez au fait des tendances les plus étonnantes du secteur dans le domaine de l’IA, de l’automatisation, des données et bien d’autres avec la newsletter Think. Consultez la déclaration de confidentialité d’IBM.
Lire la Déclaration de confidentialité d’IBM.

Merci ! Vous êtes abonné(e).

Vous recevrez votre abonnement en anglais. Vous trouverez un lien de désabonnement dans chaque newsletter. Vous pouvez gérer vos abonnements ou vous désabonner ici. Consultez la Déclaration de confidentialité d’IBM pour plus d’informations.

Méthodes de bas niveau pour surveiller Apache Spark

Spark Listener

Il s’agit d’une méthode très ancienne et infaillible pour obtenir des indicateurs. En fait, l’interface utilisateur de Spark utilise le même mécanisme pour visualiser les indicateurs. L’API Spark Listeners permet aux développeurs de suivre les événements que Spark émet pendant l’exécution de l’application. Ces événements sont généralement le début/la fin de l’application, le début/la fin du travail, le début/la fin de l’étape, etc. Vous trouverez la liste complète dans Spark JavaDoc. Il est facile de configurer et d’utiliser les programmes Spark Listeners pour obtenir des indicateurs. Après avoir effectué chacune des opérations, Spark appellera Spark Listener et transmettra des métadonnées à sa méthode. Il s’agit d’éléments tels que le temps d’exécution, les enregistrements lus/écrits, les octets lus/écrits et autres.

Ce contrôle de qualité des données, de base et de bas niveau, vérifiera le nombre et la taille des enregistrements. Supposons que vous ayez une tâche qui s’exécute quotidiennement et qui effectue une transformation/analyse sur des jeux de données entrants. Vous pouvez écrire un listener qui vérifie combien d’enregistrements ont été lus à partir de l’entrée et comparer avec le résultat du jour précédent. Lorsque la différence est significative, on peut supposer qu’il y a un problème avec la source de données.

Cependant, cette approche requiert de créer des solutions de surveillance en interne. Les valeurs des indicateurs doivent être stockées quelque part, les mécanismes d’alerte doivent être configurés. Lorsque le code de l’application change, toutes les clés d’indicateur changent aussi, et elles doivent être gérées correctement.

Cependant, même un simple Spark Listener peut fournir des informations sur vos données.

Voici un exemple de Spark Listener :

public class SomeSparkListener extends SparkListener {

    /**
     * This very simple spark listener prints metrics collected for every stage.
     *
     * @param stageCompleted
     */
    @Override
    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
        StageInfo stageInfo = stageCompleted.stageInfo();
        Iterator it = stageInfo.taskMetrics().accumulators().iterator();

        while (it.hasNext()) {
            AccumulatorV2 next = it.next();
            String key = next.name().get();
            Object value = next.value();
            System.out.printf("key: %s, value: %s%n", key, value);
        }
    }
}

Vous pouvez ajouter Spark Listener à votre application de plusieurs manières :

Vous pouvez l’ajouter de façon programmatique :

SparkSession spark = SparkSession.builder().getOrCreate();
spark.sparkContext().addSparkListener(new SomeSparkListener());

Vous pouvez également le transmettre par le biais des options de pilote de cluster spark-submit/spark :

spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"

Spark Query Execution Listener

Il s’agit d’un autre mécanisme de surveillance Spark, prêt à l’emploi. Au lieu de se concentrer sur les indicateurs de très bas niveau, Query Execution Listener permet aux développeurs de s’abonner aux événements de complétion des requêtes. Il fournit des métadonnées de plus haut niveau sur les requêtes exécutées, telles que les plans logiques et physiques, et les indicateurs d’exécution.

Vous pouvez obtenir des indicateurs tels que les enregistrements lus/écrits par requête, mais cette fois-ci, ils sont agrégés pour l’ensemble de la requête, et non pour des tâches/jobs/étapes spécifiques.

Les informations très utiles peuvent également être extraites à partir de plans tels que l’emplacement des données et le schéma. Vous pouvez extraire et stocker le schéma, ainsi que les dimensions du cadre de données, le comparer aux exécutions précédentes et déclencher des alertes en cas de problème.

Cependant, l’extraction des données à partir d’un plan peut être compliquée car cela vous oblige à utiliser une API Spark de bas niveau.

En outre, toutes les charges opérationnelles liées à la mise en œuvre de mécanismes de stockage des indicateurs et d’alerte sont toujours présentes. Ce que vous obtiendrez de Spark, ce ne sont que des métadonnées. Il incombe au développeur de les utiliser.

Voici un exemple de Query Execution Listener simple, qui imprime un plan et des indicateurs :

public class ExampleQueryExecutionListener implements QueryExecutionListener {

    /**
     * Print plan and query metrics
     *
     * @param funcName
     * @param qe
     * @param durationNs
     */
    @Override
    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        System.out.println(qe.executedPlan().prettyJson());
        Iterator it = qe.executedPlan().metrics().iterator();
        while (it.hasNext()) {
            Tuple2 next = it.next();
            System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
        }
    }

    @Override
    public void onFailure(String funcName, QueryExecution qe, Exception exception) {
    }
}

Les écouteurs d’exécution des requêtes peuvent être ajoutés soit par programmation, soit par configuration :

Dans le code de l’application : SparkSession spark = SparkSession.builder().getOrCreate() ; spark.listenerManager().register(new ExampleQueryExecutionListener());

Via spark-submit :

spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"

Mettre en œuvre une surveillance de bas niveau peut s’avérer très laborieux, mais la méthode de surveillance « système » présente un avantage considérable : elle n'entraîne pas de surcharge de calcul. Comme les métadonnées sont émises et enregistrées par les internes de Spark, elles ne pénalisent pas le temps d’exécution des requêtes.

L’utilisation des Listeners pour la surveillance vous permet d’éviter de toucher au code des applications. Cela peut présenter d’énormes avantages lorsque vous souhaitez suivre les données relatives aux applications existantes et héritées, mais que vous n’avez pas le budget nécessaire pour les modifier. Il vous suffit d’écrire un listener et de le transmettre via une configuration Spark pour obtenir une vue d’ensemble de vos données.

AI Academy

Se préparer à l’IA avec le cloud hybride

Dirigé par des leaders d’opinion IBM, le programme a pour but d’aider les chefs d’entreprise à acquérir les connaissances nécessaires qui leur permettront d’orienter leurs investissements IA vers les opportunités les plus prometteuses.

Méthodes de haut niveau pour surveiller Apache Spark

Contrôles manuels de la qualité des données

Vous pouvez considérablement renforcer votre confiance dans les données entrantes en les validant manuellement. Supposons que nous attendions un certain nombre d’enregistrements dans la source de données d’entrée, et que ce nombre ne devrait pas être inférieur à X. On peut écrire quelque chose de très simple comme :

df = spark.read("path")
     if (df.count < X) {
     throw new RuntimeException("Input data is missing")
 }

Les possibilités ici sont illimitées. Nous pouvons comparer les décomptes, le nombre de valeurs non nulles, les schémas inférés, etc.

Utiliser les bibliothèques de qualité des données

Comme de nombreux contrôles de qualité sont plus ou moins courants (par exemple, s’assurer que la forme et le contenu de votre trame de données sont corrects), la communauté a développé des bibliothèques pratiques pour ces contrôles. L’une de ces bibliothèques est Deequ, qui fournit un langage spécifique au domaine (DSL) riche pour la plupart des cas. Jetez-y un œil. Deequ propose aussi des fonctionnalités avancées, comme la capacité de profiler les colonnes, de calculer les min/max/moyenne/percentiles, calculer des histogrammes, détecter les anomalies et bien plus encore.

Prenons l’exemple suivant, tiré de la documentation Deequ :

val verificationResult = VerificationSuite()
   .onData(data)
   .addCheck(
     Check(CheckLevel.Error, "unit testing my data")
       .hasSize(_ == 5) // we expect 5 rows
       .isComplete("id") // should never be NULL
       .isUnique("id") // should not contain duplicates
       .isComplete("productName") // should never be NULL
       // should only contain the values "high" and "low"
       .isContainedIn("priority", Array("high", "low"))
       .isNonNegative("numViews") // should not contain negative values
       // at least half of the descriptions should contain a url
       .containsURL("description", _ >= 0.5)
       // half of the items should have less than 10 views
       .hasApproxQuantile("numViews", 0.5, _ <= 10))
     .run()

Vous pouvez voir que nous avons un énorme ensemble de vérifications encapsulées dans un DSL agréable et prêt à l’emploi.

Plus important encore, Deequ permet de stocker les résultats des contrôles et de les comparer automatiquement aux exécutions précédentes. Pour ce faire, vous pouvez utiliser les dépôts d’indicateurs. On peut écrire sa propre implémentation et intégrer Deequ de façon fluide dans l’infrastructure de surveillance existante.

Bien que les contrôles de qualité des applications de haut niveau soient beaucoup plus flexibles que les approches de bas niveau, ils présentent un inconvénient majeur : les pénalités de performance. Étant donné que chaque calcul émet une opération Spark, la surcharge peut être très importante dans certains cas, en particulier sur les grands jeux de données. Chaque « count » et « where » peut mener à des analyses complètes. En interne, Spark fera de son mieux pour optimiser les plans d’exécution, mais vous devez tenir compte de ces implications et vous assurer que le profilage des données ne nuira pas à votre performance.

Conclusion

Nous avons examiné plusieurs méthodes de suivi de la qualité des données pour les applications Spark. L’approche de bas niveau utilise l’API Spark Event Listeners et donne accès à des indicateurs de bas niveau tels que les enregistrements lus/écrits, les plans logiques/physiques. Elle peut être utile pour créer des tendances, s’assurer que le pipeline de données produit des résultats appropriés et avoir une vue d’ensemble des applications existantes sans aucune modification de code. Les approches de haut niveau, comme la vérification manuelle des données ou l’utilisation de bibliothèques de qualité des données, sont beaucoup plus pratiques, mais présentent des inconvénients tels que des pénalités de performance.

Comme dans toute situation réelle, il existe toujours des compromis et de meilleurs scénarios pour les deux approches, en fonction de votre type d’application. Utilisez-le judicieusement.

Chez IBM Databand®, nous utilisons ces deux approches pour offrir un ensemble complet d’options pour suivre les applications Spark. Bien qu’au cœur de notre solution nous utilisons Spark Listeners pour construire des tendances d’indicateurs et la traçabilité des données, nous proposons également un magasin d’indicateurs pratique pour Deequ ainsi que la possibilité de suivre des indicateurs individuels calculés manuellement.

Découvrez-en plus sur la plateforme d’observabilité continue des données de Databand et comment elle aide à détecter les incidents de données plus tôt, à les résoudre plus rapidement et à fournir des données plus fiables à l’entreprise. Si vous êtes prêt à aller plus loin, réservez une démo dès aujourd’hui.

Solutions connexes
IBM Z Operational Log and Data Analytics 

Accélérez l’identification des incidents des systèmes hybrides grâce à des analyses opérationnelles en temps quasi réel.

Découvrir IBM Z
Solutions d’analytique dans le cloud

Obtenez des résultats qui transforment votre entreprise grâce à des solutions d’analytique dans le cloud qui vous permettent d’analyser facilement les données et de créer des modèles de machine learning.

Découvrir les solutions d’analytique dans le cloud
Services de conseil en cloud

Explorez de nouvelles fonctionnalités et stimulez l’agilité de votre entreprise grâce aux services de conseil d’IBM Cloud.

Découvrir les services de conseil cloud
Passez à l’étape suivante

Obtenez des informations en temps réel à partir de vos données IBM Z grâce à des analyses puissantes qui relient le mainframe et le cloud, afin que vous puissiez agir plus rapidement, réduire les risques et prendre des décisions plus intelligentes.

Découvrir IBM Z Obtenir plus d'informations