Spark es crítico para la pila de datos modernos. Como tal, es extremadamente importante tener el nivel adecuado de observabilidad para sus entornos Spark. Hay muchas opciones para monitorear Spark, incluidos los programas SaaS que le proporcionan paneles preconfigurados para las métricas de Spark y Spark SQL. ¿Y si eso no es suficiente?
La configuración típica de una aplicación Spark, ya sea una solución autoalojada o gestionada, incluye algunos paneles operativos para monitorear el estado del clúster. Pero aunque esos paneles son muy útiles, solo nos brindan una visión general de la infraestructura y no las métricas reales relacionadas con los datos. Sí, podemos suponer que puede haber algún problema con la aplicación cuando la CPU ha aumentado el uso o el clúster se está quedando sin RAM, pero no ayuda cuando la fuente cambió el esquema o los datos que provienen de otro departamento no sirven. La mayoría de los problemas a los que se enfrentan los ingenieros están causados por los datos y no por la infraestructura subyacente, por lo que tienen que dedicar mucho tiempo a reproducir los problemas o a manipular archivos y buckets como si fueran detectives. Aquí es donde el seguimiento real de la aplicación puede ayudar.
Cada situación requiere un nivel diferente de visibilidad, y los ingenieros de datos deben tener la capacidad de ir más allá de las métricas de ejecución. De lo contrario, puede dedicarse una cantidad significativa de tiempo a depurar problemas de calidad de datos en Spark.
En esta guía, aprenderá a obtener niveles altos y bajos de observabilidad de los datos para Spark. Para los de alto nivel, utilizará los sistemas internos de Spark, como las API de oyentes y Query Execution Listeners. Para el nivel bajo, aprenderá a usar bibliotecas para realizar un seguimiento de las métricas de calidad de los datos.
Después de aprender a hacer ambas cosas, tendrá la opción de elegir la que mejor funcione para el problema que está tratando de resolver.
Boletín de la industria
Manténgase al día sobre las tendencias más importantes e intrigantes de la industria sobre IA, automatización, datos y más con el boletín Think. Consulte la Declaración de privacidad de IBM.
Su suscripción se entregará en inglés. En cada boletín, encontrará un enlace para darse de baja. Puede gestionar sus suscripciones o darse de baja aquí. Consulte nuestra Declaración de privacidad de IBM para obtener más información.
Es una forma muy antigua e infalible de obtener métricas. De hecho, la interfaz de usuario (IU) de Spark usa el mismo mecanismo para visualizar métricas. La API de oyentes de Spark permite a los desarrolladores realizar un seguimiento de los eventos que emite Spark durante la ejecución de la aplicación. Esos eventos suelen ser el inicio y fin de la aplicación, el inicio y fin del trabajo, el inicio y fin de la etapa, etc. Puede encontrar la lista completa en Spark JavaDoc. Es fácil de configurar y usar Spark Listeners para obtener métricas. Después de realizar cada una de las operaciones, Spark llamará a Spark Listener y pasará información de metadatos a su método. Esto incluirá cosas como el tiempo de ejecución, los registros leídos/escritos, los bytes leídos/escritos y otros.
Este control de calidad de datos muy básico y de bajo nivel verificará el número y el tamaño de los registros. Imagine que tiene un trabajo que se ejecuta a diario y ejecuta alguna transformación/analytics en los conjuntos de datos entrantes. Puede escribir un oyente que verifique cuántos registros se leyeron de la entrada y compararlo con el resultado del día anterior. Cuando la diferencia es significativa, podemos suponer que algo puede estar mal con la fuente de datos.
Sin embargo, este enfoque requiere escribir soluciones de monitoreo internas. Los valores métricos deben almacenarse en algún lugar y deben configurarse mecanismos de alerta. Cuando el código de la aplicación cambie, todas las claves de métricas también cambiarán y uno debe manejarlo adecuadamente.
Sin embargo, incluso un simple Spark Listener puede aportar algunos insights sobre sus datos.
A continuación, se muestra un ejemplo de un Spark Listener de este tipo:
public class SomeSparkListener extends SparkListener {
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
}
}
Puede agregar Spark Listener a su aplicación de varias maneras:
Agregarlo programáticamente:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
O pasarlo a través de las opciones del controlador de clúster spark-submit/spark:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
Este es otro mecanismo para el monitoreo de Spark que se proporciona de forma predeterminada. En lugar de centrarse en métricas de muy bajo nivel, Query Execution Listener permite a los desarrolladores suscribirse a eventos de finalización de consultas. Proporciona metadatos de más alto nivel sobre las consultas ejecutadas, como planes lógicos y físicos, y métricas de ejecución.
Puedes obtener métricas como registros leídos y escritos por consulta, pero esta vez agregados para toda la consulta en lugar de tareas, trabajos o etapas específicos.
Además, se puede extraer información muy útil de los planes, como la ubicación de los datos y el esquema. Puede extraer y almacenar el esquema junto con las dimensiones del dataframe y compararlo con las ejecuciones anteriores, activando alertas cuando algo va mal.
Sin embargo, extraer datos de un plan puede ser complicado porque se ve obligado a usar una API Spark de bajo nivel.
Además, todas las cargas operacionales con la implementación de mecanismos de alerta y almacenamiento de métricas siguen presentes. Lo que obtendrá de Spark son solo metadatos. Es responsabilidad del desarrollador utilizarlo.
Este es un ejemplo de Query Execution Listener simple que imprime planes y métricas:
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
Los oyentes de ejecución de consultas se pueden agregar mediante programación o mediante configuración:
En el código de la aplicación: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
A través de spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
Implementar el monitoreo de bajo nivel puede ser un trabajo pesado; sin embargo, la forma de monitoreo del "sistema" tiene un gran beneficio: no introduce sobrecarga computacional. Dado que los metadatos son emitidos y registrados por los componentes internos de Spark, no afectan negativamente los tiempos de ejecución de las consultas.
El uso de oyentes para el monitoreo permite evitar tocar cualquier código de aplicación. Esto puede tener grandes beneficios cuando quiere hacer un seguimiento de datos de aplicaciones heredadas y existentes, pero no tiene presupuesto para hacer cambios. Simplemente escriba un oyente, páselo a través de la configuración de Spark y obtenga una imagen de sus datos.
Puede aumentar enormemente su confianza en los datos entrantes al validarlos manualmente. Supongamos que esperamos un número determinado de registros en la fuente de datos de entrada y este número normalmente no debería ser inferior a X. Podemos escribir algo muy simple como:
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
Las posibilidades aquí son ilimitadas. Podemos comparar recuentos, recuentos de valores no nulos, esquemas inferidos, etc.
Dado que muchos controles de calidad son más o menos triviales, como garantizar que su marco de datos tenga la forma y el contenido adecuados, la comunidad desarrolló bibliotecas convenientes para tales controles. Una de esas bibliotecas es Deequ. Proporciona un lenguaje específico de dominio (DSL) enriquecido para la mayoría de los casos. Eche un vistazo. Además, cuenta con funciones avanzadas, como la capacidad de perfilar columnas, calcular mínimos, máximos, medias o percentiles, calcular histogramas, detectar anomalías y muchas más.
Considere el siguiente ejemplo de documentos de Deequ:
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // we expect 5 rows
.isComplete("id") // should never be NULL
.isUnique("id") // should not contain duplicates
.isComplete("productName") // should never be NULL
// should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // should not contain negative values
// at least half of the descriptions should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 views
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
Como puede ver, contamos con un amplio conjunto de verificaciones integradas en un DSL agradable y listo para usar.
Más importante aún, Deequ ofrece la capacidad de almacenar los resultados de las comprobaciones y ejecutar automáticamente comparaciones con ejecuciones anteriores. Esto se puede hacer mediante repositorios de métricas. Uno puede escribir su propia implementación e integrar perfectamente Deequ en la infraestructura de monitoreo existente.
Aunque los controles de calidad de aplicaciones de alto nivel son mucho más flexibles que los de bajo nivel, conllevan un gran inconveniente: las penalizaciones de rendimiento. Dado que cada cálculo emite una operación de Spark, la sobrecarga puede ser muy significativa en algunos casos, especialmente en conjuntos de datos grandes. Cada "recuento" y "dónde" pueden conducir a análisis completos. Spark internamente hará todo lo posible para optimizar los planes de ejecución, pero debe considerar estas implicaciones y asegurarse de que el perfilado de datos no perjudique su rendimiento.
Hemos revisado varias formas de monitorear la calidad de los datos para las aplicaciones Spark. El enfoque de bajo nivel utiliza la API Spark Event Listeners y da acceso a métricas de bajo nivel, como registros leídos/escritos, planes lógicos/físicos, y puede ser útil para crear tendencias y asegurarse de que el pipeline de datos produzca resultados adecuados y obtener una visión general de las aplicaciones existentes sin ningún tipo de modificaciones de código. Los enfoques de alto nivel, como la verificación manual de datos o el uso de bibliotecas de calidad de datos, son mucho más convenientes, pero tienen inconvenientes como penalizaciones en el rendimiento.
Como en cualquier situación del mundo real, siempre hay compensaciones y mejores escenarios para ambos enfoques, según el tipo de aplicación. Úsalo con prudencia.
En IBM® Databand, utilizamos ambas formas para proporcionar un conjunto completo de opciones para rastrear las aplicaciones Spark. Aunque en nuestro núcleo empleamos Spark Listeners para construir tendencias métricas y linaje de datos, también ofrecemos un Metrics Store conveniente para Deequ, así como la capacidad de rastrear métricas individuales calculadas manualmente.
Aprenda más sobre la plataforma de observabilidad de los datos continua de Databand y cómo ayuda a detectar incidentes de datos con mayor anticipación, resolverlos más rápido y entregar datos más confiables al negocio. Si está listo para profundizar, reserve una demostración hoy.
Acelera la identificación de incidentes híbridos con analytics operativos casi en tiempo real.
Desbloquee Resultados que cambian el negocio con soluciones de analytics en la nube que le permiten analizar datos fácilmente y crear modelos de machine learning.
Descubra nuevas capacidades e impulse la agilidad del negocio con los servicios de consultoría en la nube de IBM.