O Spark é crítico para o stack de dados moderno. Como tal, é extremamente importante ter o nível certo de observabilidade para seus ambientes Spark. Há muitas opções para monitorar o Spark, incluindo programas SaaS que fornecem dashboards pré-configurados para métricas do Spark e do Spark SQL. E se isso não for suficiente?
A configuração típica de uma aplicação Spark, seja uma solução auto-hospedada ou gerenciada, inclui alguns dashboards operacionais para monitoramento da integridade do cluster. Mas embora esses dashboards sejam muito úteis, eles nos trazem apenas uma visão geral da infraestrutura e não as métricas reais relacionadas aos dados. Sim, podemos assumir que pode haver algo errado com o aplicativo quando a CPU aumentou o uso ou o cluster está ficando sem RAM, mas não ajuda quando a fonte alterou o esquema ou os dados que vieram de outro departamento estão quebrados. A maioria dos problemas que os engenheiros enfrentam é causada pelos dados e não pela infraestrutura subjacente, de modo que eles precisam gastar muito tempo reproduzindo problemas ou consertando arquivos e buckets como investigadores. É aqui que o monitoramento real da aplicação pode ajudar.
Cada situação exige um nível diferente de visibilidade, e os engenheiros de dados precisam ter a capacidade de ir a um nível mais profundo do que as métricas de execução. Caso contrário, você poderá gastar uma quantidade significativa de tempo depurando problemas de qualidade de dados no Spark.
Neste guia, você aprenderá como obter níveis altos e baixos de observabilidade de dados para o Spark. Para o nível superior, você usará os sistemas internos do Spark, como APIs de Listener e Query Execution Listeners. No nível inferior, você aprenderá a usar bibliotecas para rastrear métricas de qualidade de dados.
Depois de aprender a fazer as duas coisas, você terá a opção de escolher a que funcionar melhor para o problema que está tentando resolver.
Boletim informativo do setor
Mantenha-se atualizado sobre as tendências mais importantes e fascinantes do setor em IA, automação, dados e muito mais com o boletim informativo da Think. Consulte a declaração de privacidade da IBM.
Sua inscrição será entregue em inglês. Você pode encontrar um link para cancelar a inscrição em todos os boletins informativos. Você pode gerenciar suas inscrições ou cancelar a inscrição aqui. Consulte nossa declaração de privacidade da IBM para obter mais informações.
Esta é uma forma muito antiga e infalível de obter métricas. Na verdade, a IU do Spark utiliza o mesmo mecanismo para visualizar métricas. A API de Spark Listeners permite que os desenvolvedores rastreiem eventos que o Spark emite durante a execução da aplicação. Esses eventos são normalmente início/fim de aplicação, início/fim de tarefa, início/fim de estágio etc. A lista completa está disponível no Spark JavaDoc. É fácil de configurar e fácil de usar os Spark Listeners para obter métricas. Depois de executar cada uma das operações, o Spark chamará o Spark Listener e transmitirá algumas informações de metadados para seu método. Isso incluirá coisas como tempo de execução, registros lidos/escritos, bytes lidos/escritos e outros.
Esse monitoramento de qualidade de dados muito básico e de baixo nível verificará a contagem e o tamanho dos registros. Imagine que você tem algum trabalho que é executado diariamente e executa alguma transformação/análise em conjuntos de dados recebidos. Você pode escrever um listener que verifica quantos registros foram lidos na entrada e compará-los com o resultado do dia anterior. Quando a diferença é significativa, podemos assumir que algo pode estar errado com a fonte de dados.
No entanto, essa abordagem exige a criação de soluções internas de monitoramento. Os valores métricas devem ser armazenar em algum lugar, e os mecanismos de alerta devem ser configurados. Quando o código da aplicação vai mudar, todas as chaves de métrica também mudarão e deve-se lidar com isso adequadamente.
No entanto, até mesmo um simples Spark Listener pode fornecer alguns insights sobre seus dados.
Aqui está um exemplo de um Spark Listener:
public class SomeSparkListener extends SparkListener {
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
}
}
Você pode adicionar o Spark Listener à sua aplicação de várias maneiras:
Adicione-o de forma programática:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
Ou passe-o por opções de spark-submit/spark cluster driver:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
Esse é outro mecanismo para o monitoramento do Spark fornecido pronto para uso. Em vez de se concentrar em métrica de nível muito baixo, o Query Execution Listener permite que os desenvolvedores inscreva-se em eventos de conclusão de consultas. Ele fornece metadados de alto nível sobre a consulta executada, como planos lógicos e físicos e métricas de execução.
Você pode obter métricas como registros lidos/escritos por consulta, mas desta vez agregadas para toda a consulta em vez de tarefas/trabalhos/fases específicas.
Além disso, informações muito úteis podem ser extraídas de planos como localização e esquema de dados. Você pode extrair e armazenar o esquema junto com as dimensões do dataframe e compará-lo com as execuções anteriores, acionando alertas quando algo estiver errado.
No entanto, extrair dados de um plano pode ser complicado porque você é forçado a usar uma API Spark de baixo nível.
Além disso, todos os encargos operacionais com a implementação de armazenamento de métricas e mecanismos de alerta ainda estão presentes. O que você receberá do Spark são apenas metadados. É responsabilidade do desenvolvedor utilizá-los.
Here is an example of simple Query Execution Listener which prints plan and metrics:
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
Os Query Execution Listeners podem ser adicionados programaticamente ou por meio de configuração:
No código da aplicação: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
Via spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
A implementação do monitoramento de baixo nível pode ser realmente trabalhosa, mas a forma de monitoramento do "sistema" tem um grande benefício: não introduz sobrecarga computacional. Como os metadados são emitidos e registrados pelo Spark internamente, isso não afeta os tempos de execução das consultas.
Usar Listeners para monitoramento permite que você evite tocar em qualquer código de aplicação. Isso pode trazer grandes benefícios quando você quer rastrear dados em aplicações existentes e legados, mas não tem orçamento para fazer alterações. Basta escrever um listener, passá-lo via configuração do Spark e obter uma imagem dos seus dados.
Você pode aumentar muito sua confiança nos dados recebidos validando-os manualmente. Digamos que esperamos algum número de registros na fonte de dados de entrada e esse número não deve ser geralmente menor que X. Podemos escrever algo muito simples como:
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
As possibilidades aqui são ilimitadas. Podemos comparar contagens, contagem de valores não nulos, esquemas inferidos etc.
Como muitas verificações de qualidade são mais ou menos triviais, como garantir que seu dataframe tenha forma e conteúdo adequados, a comunidade desenvolveu bibliotecas convenientes para essas verificações. Uma dessas bibliotecas é a Deequ. Ele fornece uma linguagem específica de domínio (DSL) avançada para a maioria dos casos. Dê uma olhada. Além disso, possui recursos avançados, como a capacidade de criar perfil de colunas, calcular mín./máx./média/percentis, calcular histogramas, detectar anomalias e muito mais.
Considere o seguinte exemplo de documentos Deequ:
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // esperamos 5 linhas
.isComplete("id") // nunca deve ser NULL
.isUnique("id") // não deve conter duplicatas
.isComplete("productName") // nunca deve ser NULL
// deve sempre conter os valores "high" e "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // nunca deve conter valores
// ao menos metade das descrições deve conter uma url
.containsURL("description", _ >= 0.5)
// metade dos itens deve ter ao menos 10 visualizações
.hasApproxQuantile("numViews", 0.5, _ <= 10)) .run()
Você pode ver que temos um enorme conjunto de verificações envoltos em um DSL bonito e pronto para uso.
Mais importante, o Deequ oferece a capacidade de armazenar os resultados das verificações e executar automaticamente comparações com execuções anteriores. Isso pode ser feito utilizando os Repositórios de Métricas. É possível escrever sua própria implementação e integrar perfeitamente o Deequ à infraestrutura de monitoramento existente.
Embora as verificações de qualidade das aplicações de alto nível sejam muito mais flexíveis do que as abordagens de baixo nível, elas têm uma grande desvantagem: penalidades de desempenho. Como todo cálculo emite operações do Spark, a sobrecarga pode ser muito significativa em alguns casos, especialmente nos grandes conjuntos de dados. Cada "contagem" e "onde" podem levar a verificações completas. A Spark fará o melhor para otimizar internamente os planos de execução, mas você deve considerar essas implicações e garantir que a criação de perfis de dados não prejudique seu desempenho.
Analisamos várias avaliações de qualidade de dados para aplicações do Spark. A abordagem de baixo nível utiliza a API de Event Listeners do Spark e dá acesso a métricas de baixo nível, como registros lidos/escritos, planos lógicos/físicos, e pode ser útil para criar tendências e garantir que o pipeline de dados produza resultados adequados e obtenha uma visão geral das aplicações existentes sem qualquer no código. Abordagens de alto nível, como a verificação manual dos dados ou o uso de bibliotecas de qualidade de dados, são muito mais convenientes, mas têm desvantagens, como penalidades de desempenho.
Como em qualquer situação do mundo real, sempre há compensações e cenários melhores para ambas as abordagens, dependendo do tipo de sua aplicação. Use-o com sabedoria.
No IBM Databand, utilizamos ambas as formas de oferecer um conjunto abrangente de opções para rastrear aplicações Spark. Embora em nosso núcleo usemos Spark Listeners para criar tendências de métrica e linhagem de dados, também fornecemos o conveniente armazenar para Deequ, bem como a capacidade de rastrear métricas individuais calculadas manualmente.
Saiba mais sobre a plataforma de observabilidade de dados do Databand e como ela ajuda a detectar incidentes de dados mais cedo, resolvê-los mais rapidamente e entregar dados mais confiáveis para a empresa. Se você está pronto para fazer uma análise mais detalhada, agende uma demonstração hoje.
Acelere a identificação de incidentes híbridos com análises operacionais quase em tempo real.
Libere resultados que mudem os negócios com soluções de análise de dados em nuvem que possibilitam a análise de dados com facilidade e a criação de modelos de aprendizado de máquina.
Conheça novos recursos e aumente a agilidade dos negócios com os serviços de consultoria em nuvem da IBM.