Spark es crítico para la pila de datos modernos. Como tal, es extremadamente importante tener el nivel adecuado de observabilidad para sus entornos Spark. Hay muchas opciones para monitorizar Spark, incluidos los programas SaaS que le proporcionan paneles de control preconfigurados para las métricas de Spark y Spark SQL. ¿Y si eso no es suficiente?
La configuración típica de una aplicación Spark, ya sea una solución autoalojada o gestionada, incluye algunos paneles de control para monitorizar el estado del clúster. Pero aunque esos paneles de control son muy útiles, solo nos ofrecen una visión general de la infraestructura y no las métricas reales relacionadas con los datos. Sí, podemos suponer que puede haber algún problema con la aplicación cuando la CPU ha aumentado el uso o el clúster se está quedando sin RAM, pero no ayuda cuando la fuente cambió el esquema o los datos que provienen de otro departamento están rotos. La mayoría de los problemas a los que se enfrentan los ingenieros están causados por los datos y no por la infraestructura subyacente, por lo que tienen que dedicar mucho tiempo a reproducir problemas o a manipular archivos y depósitos como detectives. Aquí es donde el seguimiento real de la aplicación puede ayudar.
Cada situación requiere un nivel diferente de visibilidad, y los ingenieros de datos deben tener la capacidad de ir un nivel más allá de las métricas de ejecución. De lo contrario, puede dedicar una cantidad significativa de tiempo a depurar problemas de calidad de los datos en Spark.
En esta guía, aprenderá a obtener niveles altos y bajos de observabilidad de los datos para Spark. Para el alto nivel, utilizará los sistemas internos de Spark, como las API de escucha y los oyentes de ejecución de consultas. Para el nivel bajo, aprenderá a utilizar bibliotecas para realizar un seguimiento de las métricas de calidad de los datos.
Después de aprender a hacer ambas cosas, tendrá la opción de elegir la que mejor funcione para el problema que está tratando de resolver.
Boletín del sector
Manténgase al día sobre las tendencias más importantes e intrigantes del sector en materia de IA, automatización, datos y mucho más con el boletín Think. Consulte la Declaración de privacidad de IBM.
Su suscripción se enviará en inglés. Encontrará un enlace para darse de baja en cada boletín. Puede gestionar sus suscripciones o darse de baja aquí. Consulte nuestra Declaración de privacidad de IBM para obtener más información.
Es una forma muy antigua e infalible de obtener métricas. De hecho, Spark UI utiliza el mismo mecanismo para visualizar métricas. La API de escuchas de Spark permite a los desarrolladores realizar un seguimiento de los eventos que emite Spark durante la ejecución de la aplicación. Esos eventos suelen ser el inicio/fin de la aplicación, el inicio/fin del trabajo, el inicio/fin de la etapa, etc. Puede encontrar la lista completa en Spark JavaDoc. Es fácil de configurar y usar Spark Listeners para obtener métricas. Después de realizar cada una de las operaciones, Spark llamará a Spark Listener y pasará información de metadatos a su método. Esto incluirá cosas como el tiempo de ejecución, los registros leídos/escritos, los bytes leídos/escritos y otros.
Esta monitorización de la calidad de los datos, muy básica y de bajo nivel, comprobará el recuento y el tamaño de los registros. Imagine que tiene un trabajo que se ejecuta a diario y ejecuta alguna transformación/análisis en los conjuntos de datos entrantes. Puede escribir un oyente que compruebe cuántos registros se leyeron de la entrada y compararlo con el resultado del día anterior. Cuando la diferencia es significativa, podemos suponer que algo puede estar mal con la fuente de datos.
Sin embargo, este enfoque requiere escribir soluciones de monitorización internas. Los valores de la métrica deberían almacenarse en algún lugar, los mecanismos de alerta deberían configurarse. Cuando el código de la aplicación cambia, todas las claves de métricas también cambiarán y hay que gestionarlo correctamente.
Sin embargo, incluso un simple Spark Listener puede aportar algunos conocimientos sobre sus datos.
Aquí hay un ejemplo de un Spark Listener de este tipo:
public class SomeSparkListener extends SparkListener {
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
Puede añadir Spark Listener a su aplicación de varias maneras:
Añádalo mediante programación:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
O páselo a través de las opciones del controlador de clúster de spark-submit:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
Este es otro mecanismo de monitorización de Spark que viene listo para usar. En lugar de centrarse en métricas de muy bajo nivel, Query Execution Listener permite a los desarrolladores suscribirse a eventos de finalización de consultas. Proporciona metadatos de más alto nivel sobre las consultas ejecutadas, como planes lógicos y físicos, y métricas de ejecución.
Puede obtener métricas como los registros leídos o escritos por consulta, pero esta vez se suman para toda la consulta en lugar de para tareas, trabajos o fases específicos.
También se puede extraer información muy útil de planes como la ubicación de datos y el esquema. Puede extraer y almacenar el esquema junto con las dimensiones del marco de datos y compararlo con las ejecuciones anteriores, activando alertas cuando algo va mal.
Sin embargo, extraer datos de un plan puede ser complicado porque se ve obligado a utilizar una API de Spark de bajo nivel.
Además, todas las cargas operativas relacionadas con la implementación de los mecanismos de almacenamiento y alerta de métricas siguen presentes. Lo que obtendrá de Spark son solo metadatos. Es responsabilidad del desarrollador utilizarlos.
He aquí un ejemplo de un sencillo detector de ejecución de consultas que imprime el plan y las métricas:
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
Los oyentes de ejecución de consultas pueden añadirse tanto de forma programática como mediante configuración:
En el código de la aplicación: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
A través de spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
Implementar la monitorización de bajo nivel puede ser un trabajo pesado, sin embargo, la forma de monitorización del "sistema" tiene un gran beneficio: no introduce sobrecarga computacional. Dado que los metadatos son emitidos y registrados por las funciones internas de Spark, no penalizan los tiempos de ejecución de las consultas.
El uso de oyentes para la monitorización le permite evitar tocar cualquier código de aplicación. Esto puede tener enormes beneficios cuando desea rastrear datos en aplicaciones existentes y heredadas, pero no tiene el presupuesto para realizar cambios. Solo tiene que escribir un listener, pasarlo a través de la configuración de spark y obtener una imagen de sus datos.
Puede aumentar en gran medida su confianza en los datos entrantes validándolos manualmente. Supongamos que esperamos un número determinado de registros en la fuente de datos de entrada y este número normalmente no debería ser inferior a X. Podemos escribir algo muy simple como:
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
Las posibilidades aquí son ilimitadas. Podemos comparar recuentos, recuentos de valores no nulos, esquemas inferidos, etc.
Dado que muchas comprobaciones de calidad son más o menos triviales, como asegurarse de que su dataframe tenga la forma y el contenido adecuados, la comunidad ha desarrollado bibliotecas convenientes para tales comprobaciones. Una de esas bibliotecas es Deequ. Proporciona un rico lenguaje específico de dominio (DSL) para la mayoría de los casos. Échele un vistazo. También tiene cosas avanzadas, como la capacidad de perfilar columnas, calcular mín./máx./media/percentiles, calcular histogramas, detectar anomalías y mucho más.
Considere el siguiente ejemplo de los documentos de Deequ:
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // esperamos 5 filas
.isComplete("id") // nunca debe ser NULL
.isUnique("id") // no debe contener duplicados
.isComplete("productName") // nunca debe ser NULL
// solo debe contener los valores "high" y "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") No debe contener valores negativos
// al menos la mitad de las descripciones deberían contener una URL
.containsURL("descripción", _ >= 0.5)
// la mitad de los elementos deben tener menos de 10 vistas
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
Puede ver que tenemos una enorme serie de cheques envueltos en una bonita DSL lista para usar.
Más importante aún, Deequ ofrece la capacidad de almacenar resultados de comprobaciones y realizar comparaciones automáticas con ejecuciones anteriores. Esto se puede hacer utilizando Repositorios de métricas. Se puede escribir su propia implementación e integrar Deequ de manera fluida en la infraestructura de monitorización existente.
Aunque los controles de calidad de aplicaciones de alto nivel son mucho más flexibles que los de bajo nivel, conllevan un gran inconveniente: las penalizaciones de rendimiento. Dado que cada cálculo emite una operación de spark, la sobrecarga puede ser muy significativa en algunos casos, especialmente en conjuntos de datos grandes. Cada "recuento" y "lugar" pueden conducir a escaneos completos. Spark hará todo lo posible para optimizar los planes de ejecución, pero debe tener en cuenta estas implicaciones y asegurarse de que la creación de perfiles de datos no perjudica a su rendimiento.
Hemos revisado varias formas de monitorizar la calidad de los datos para aplicaciones Spark. El enfoque de bajo nivel utiliza la API Spark Event Listeners y da acceso a métricas de bajo nivel como registros leídos/escritos, planes lógicos/físicos y puede ser útil para crear tendencias y asegurarse de que la canalización de datos produce resultados adecuados y obtener una visión general de las aplicaciones existentes sin ningún tipo de modificaciones de código. Los enfoques de alto nivel, como la comprobación de datos a mano o el uso de bibliotecas de calidad de los datos, son mucho más cómodos, pero presentan inconvenientes como las penalizaciones de rendimiento.
Como en cualquier situación del mundo real, siempre hay compensaciones y escenarios mejores para ambos enfoques, según el tipo de aplicación. Úselo sabiamente.
En IBM Databand, utilizamos ambas formas de proporcionar un conjunto completo de opciones para rastrear aplicaciones Spark. Aunque en nuestro núcleo utilizamos Spark Listeners para construir tendencias métricas y linaje de datos, también ofrecemos un almacén de métricas conveniente para Deequ, así como la capacidad de rastrear métricas individuales calculadas manualmente.
Obtenga más información sobre la plataforma de observabilidad continua de datos de Databand y cómo ayuda a detectar antes las incidencias en los datos, resolverlas más rápidamente y ofrecer datos más fiables a la empresa. Si está listo para profundizar, solicite una demostración hoy mismo.
Acelere la identificación de incidentes híbridos con análisis operativos casi en tiempo real.
Desbloquee resultados que cambian el negocio con soluciones de análisis en la nube que le permiten analizar datos fácilmente y crear modelos de machine learning.
Descubra nuevas capacidades e impulse la agilidad empresarial con los servicios de consultoría en la nube de IBM.