Spark는 최신 데이터 스택에 매우 중요합니다. 따라서 Spark 환경에 적절한 수준의 관측 가능성을 확보하는 것이 매우 중요합니다. Spark 및 Spark SQL 지표를 위한 사전 구성된 대시보드를 제공하는 SaaS 프로그램을 포함하여 Spark 모니터링을 위한 다양한 옵션이 있습니다. 이것으로 충분하지 않다면 어떻게 해야 할까요?
자체 호스팅 솔루션이든 관리형 솔루션이든, 일반적인 Spark 애플리케이션 설정에는 클러스터 상태 모니터링을 위한 몇 가지 운영 대시보드가 포함되어 있습니다. 이러한 대시보드는 매우 유용하지만, 인프라 개요만 제공하고 데이터와 관련된 실제 지표는 제공하지 않습니다. 물론 CPU 사용량이 증가했거나 클러스터에 RAM이 부족한 경우 앱에 문제가 있다고 가정할 수 있지만, 소스가 스키마를 변경하거나 다른 부서에서 가져온 데이터가 손상된 경우에는 도움이 되지 않습니다. 엔지니어가 직면하는 문제의 대다수는 기본 인프라가 아닌 데이터로 인해 발생하므로, 문제를 재현하거나 마치 탐정처럼 파일과 버킷을 조사하는 데 많은 시간을 소비해야 합니다. 이러한 경우 실제 애플리케이션 모니터링이 도움이 될 수 있습니다.
모든 상황에는 다른 수준의 가시성이 필요하며, 데이터 엔지니어는 실행 지표보다 한 단계 더 깊이 들어갈 수 있는 능력이 있어야 합니다. 그렇지 않으면 Spark에서 데이터 품질 문제를 디버깅하는 데 상당한 시간을 소비할 수 있습니다.
이 가이드에서는 Spark의 높은 수준과 낮은 수준의 데이터 관측성을 확보하는 방법을 알아봅니다. 개략적인 경우에는 리스너 API 및 쿼리 실행 리스너와 같은 Spark의 내부 시스템을 사용합니다. 낮은 수준에서는 라이브러리를 사용하여 데이터 품질 지표를 추적하는 방법을 배웁니다.
두 가지 방법을 모두 배운 후에는 해결하려는 문제에 가장 적합한 방법을 선택할 수 있습니다.
업계 뉴스레터
Think 뉴스레터를 통해 AI, 자동화, 데이터 등 가장 중요하고 흥미로운 업계 동향에 대한 최신 소식을 받아보세요. IBM 개인정보 보호정책을 참조하세요.
구독한 뉴스레터는 영어로 제공됩니다. 모든 뉴스레터에는 구독 취소 링크가 있습니다. 여기에서 구독을 관리하거나 취소할 수 있습니다. 자세한 정보는 IBM 개인정보 보호정책을 참조하세요.
이것은 지표를 얻는 매우 오래되고 방탄적인 방법입니다. 실제로 Spark UI는 동일한 메커니즘을 활용하여 지표를 시각화합니다. 개발자는 Spark 리스너 API를 사용하여 애플리케이션 실행 중에 Spark가 내보내는 이벤트를 추적할 수 있습니다. 이러한 이벤트는 일반적으로 애플리케이션 시작/종료, 작업 시작/종료, 스테이지 시작/종료 등입니다. 전체 목록은 Spark JavaDoc에서 확인할 수 있습니다. 구성하기 쉽고 Spark 리스너를 사용하여 지표를 쉽게 얻을 수 있습니다. 각 작업을 수행한 후 Spark는 Spark 리스너를 호출하고 일부 메타데이터 정보를 해당 메서드에 전달합니다. 여기에는 실행 시간, 읽기/쓰기된 레코드, 읽기/쓰기된 바이트 등이 포함됩니다.
이 매우 기본적이고 낮은 수준의 데이터 품질 모니터링은 레코드 수와 크기를 확인합니다. 매일 실행되고 들어오는 데이터 세트에 대해 변환/분석을 실행하는 작업이 있다고 상상해 보세요. 입력에서 읽은 레코드 수를 확인하고 이를 전날의 결과와 비교할 수 있는 리스너를 작성할 수 있습니다. 차이가 큰 경우 데이터 소스에 문제가 있다고 가정할 수 있습니다.
하지만 이 접근 방식은 사내 모니터링 솔루션을 직접 작성해야 합니다. 지표 값은 어딘가에 저장되어야 하고 경고 메커니즘을 구성해야 합니다. 애플리케이션 코드가 변경되면 모든 지표 키도 변경되므로 이를 적절하게 처리해야 합니다.
그러나 간단한 Spark 리스너로도 데이터에 대한 인사이트를 얻을 수 있습니다.
다음은 이러한 Spark 리스너의 예입니다.
public class SomeSparkListener extends SparkListener {
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
}
}
다음과 같은 여러 가지 방법으로 Spark 리스너를 애플리케이션에 추가할 수 있습니다.
프로그래밍적으로 추가:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
또는 spark-submit/spark 클러스터 드라이버 옵션을 통해 전달할 수 있습니다.
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"
이는 기본 제공되는 Spark 모니터링을 위한 또 다른 메커니즘입니다. 개발자는 쿼리 실행 리스너를 통해 매우 낮은 수준의 지표에 집중하는 대신 쿼리 완료 이벤트를 구독하기 위해 구독할 수 있습니다. 논리적 및 물리적 계획과 실행 지표와 같이 실행된 쿼리에 대한 보다 높은 수준의 메타데이터를 제공합니다.
쿼리별로 읽거나 쓴 기록과 같은 지표를 얻을 수 있지만, 이번에는 특정 작업/업무/단계 대신 전체 쿼리에 대해 집계됩니다.
또한 데이터 위치 및 스키마와 같은 계획에서 매우 유용한 정보를 추출할 수 있습니다. 데이터 프레임 차원과 함께 스키마를 추출하여 저장하고 이전 실행과 비교하여 문제가 발생하면 알림을 트리거할 수 있습니다.
그러나 계획에서 데이터를 추출하는 것은 낮은 수준의 Spark API를 사용해야 하기 때문에 복잡할 수 있습니다.
또한 지표 저장소 및 경고 메커니즘을 구현하는 데 따른 모든 운영 부담도 여전히 존재합니다. Spark에서 얻을 수 있는 것은 메타데이터일 뿐입니다. 이를 활용하는 것은 개발자의 책임입니다.
다음은 계획과 지표를 인쇄하는 간단한 쿼리 실행 리스너의 예입니다.
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
쿼리 실행 리스너는 프로그래밍 방식으로 또는 구성을 통해 추가할 수 있습니다.
애플리케이션 코드: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
spark-submit을 통해:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
낮은 수준의 모니터링을 구현하는 것은 심각한 부담이 될 수 있지만, '시스템' 모니터링 방식은 계산 오버헤드가 발생하지 않는다는 큰 이점이 있습니다. 메타데이터가 Spark 내부에서 내보내기되고 기록되므로, 쿼리 실행 시간에 불이익이 없습니다.
모니터링을 위해 리스너를 사용하면 애플리케이션 코드를 건드리지 않아도 됩니다. 기존 애플리케이션과 레거시 애플리케이션에서 데이터를 추적하고 싶지만 변경할 예산이 없는 경우에 큰 이점을 얻을 수 있습니다. 리스너를 작성하고 Spark 구성을 통해 전달하고 데이터를 파악하기만 하면 됩니다.
수신 데이터를 수동으로 검증하면 신뢰도를 크게 높일 수 있습니다. 입력 데이터 소스에 몇 개의 레코드가 있을 것으로 예상하고 이 숫자가 일반적으로 X보다 작아서는 안 된다고 가정해 보겠습니다. 다음과 같이 아주 간단하게 작성할 수 있습니다.
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
가능성은 무한합니다. 개수, null이 아닌 값의 개수, 추론된 스키마 등을 비교할 수 있습니다.
많은 품질 검사는 데이터 프레임의 모양과 내용이 적절한지 확인하는 등 다소 간단하므로, 커뮤니티에서는 이러한 검사를 위한 편리한 라이브러리를 개발했습니다. 그러한 라이브러리 중 하나가 Deequ입니다. Deequ는 대부분의 경우 풍부한 도메인별 언어(DSL)를 제공합니다. 확인해 보세요. 또한 열 프로파일링, 최소/최대/평균/백분위수 계산, 히스토그램 계산, 이상 징후 감지 등의 고급 기능도 갖추고 있습니다.
Deequ 문서의 다음 예시를 살펴보세요.
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // we expect 5 rows
.isComplete("id") // should never be NULL
.isUnique("id") // should not contain duplicates
.isComplete("productName") // should never be NULL
// should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // should not contain negative values
// at least half of the descriptions should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 views
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
보시다시피 즉시 사용 가능한 멋진 DSL로 포장된 거대한 검사 세트가 있습니다.
더 중요한 것은 Deequ가 검사 결과를 저장하고 이전 실행과의 비교를 자동으로 실행할 수 있는 기능을 제공한다는 것입니다. 지표 리포지토리를 활용하여 이를 수행할 수 있습니다. 자체 구현을 작성하고 Deequ를 기존 모니터링 인프라에 원활하게 통합할 수 있습니다.
높은 수준의 애플리케이션 품질 검사는 낮은 수준의 접근 방식보다 훨씬 유연하지만 성능 저하라는 큰 단점이 있습니다. 모든 계산에서 Spark 연산이 발생하기 때문에 경우에 따라 오버헤드가 매우 클 수 있으며, 특히 대규모 데이터 세트의 경우 더욱 그렇습니다. 각각의 "count"와 "where"는 전체 스캔으로 이어질 수 있습니다. Spark는 내부적으로 실행 계획을 최적화하기 위해 최선을 다하지만, 이러한 영향을 고려하고 데이터 프로파일링이 성능에 해를 끼치지 않도록 해야 합니다.
Spark 애플리케이션의 데이터 품질을 모니터링하는 몇 가지 방법을 검토했습니다. 낮은 수준의 접근 방식은 Spark 이벤트 리스너 API를 활용하며, 읽기/쓰기 기록, 논리적/물리적 계획과 같은 하위 수준 지표에 대한 액세스를 제공하며, 트렌드를 구축하고 데이터 파이프라인이 적절한 결과를 생성하는지 확인하고 별도의 작업 없이 기존 애플리케이션에 대한 개요를 얻는 데 유용할 수 있습니다. 데이터를 직접 확인하거나 데이터 품질 라이브러리를 사용하는 것과 같은 고급 접근 방식이 훨씬 편리하지만 성능 저하와 같은 단점이 있습니다.
실제 상황과 마찬가지로, 애플리케이션 유형에 따라 두 접근 방식 모두 항상 장단점이 있습니다. 현명하게 사용하세요.
IBM® Databand는 두 가지 방법을 모두 활용하여 Spark 애플리케이션을 추적하는 포괄적인 옵션 세트를 제공합니다. Core에서는 Spark 리스너를 사용하여 지표 추세와 데이터 리니지를 구축하고 있으며, 편리한 Metrics Store for Deequ와 수동으로 계산한 개별 지표를 추적할 수 있는 기능도 제공합니다.
Databand의 연속 데이터 관측성 플랫폼에 대해 자세히 알아보고, 데이터 사고를 조기에 감지하여 빠르게 해결하고 비즈니스에 더 신뢰할 수 있는 데이터를 제공하는 방법을 확인하세요. 더 자세히 살펴볼 준비가 되셨다면 지금 바로 데모를 예약하세요.
실시간 운영 분석을 통해 하이브리드 인시던트 식별을 가속화합니다.
데이터를 쉽게 분석하고 머신 러닝 모델을 구축할 수 있는 클라우드 분석 솔루션을 사용하여 비즈니스를 변화시키는 결과를 얻으세요.
IBM의 클라우드 컨설팅 서비스를 통해 새로운 역량을 개발하고 비즈니스 민첩성을 향상하세요.