Spark sangat penting untuk tumpukan data modern. Dengan demikian, sangat penting untuk memiliki tingkat observabilitas yang tepat untuk lingkungan Spark Anda. Ada banyak opsi untuk memantau Spark, termasuk program SaaS yang memberi Anda dasbor pra-konfigurasi untuk metrik Spark dan Spark SQL. Bagaimana jika itu tidak cukup?
Pengaturan aplikasi Spark yang khas, baik itu solusi yang di-host sendiri atau dikelola, mencakup beberapa dasbor operasional untuk pemantauan kesehatan klaster. Namun, meskipun dasbor tersebut sangat berguna, dasbor tersebut hanya memberi kita gambaran umum infrastruktur dan bukan metrik aktual yang terkait dengan data. Ya, kita dapat berasumsi mungkin ada yang salah dengan aplikasi ketika CPU telah meningkatkan penggunaan atau klaster kehabisan RAM, tetapi itu tidak membantu ketika sumber mengubah skema atau data yang berasal dari departemen lain rusak. Sebagian besar masalah yang dihadapi insinyur disebabkan oleh data dan bukan oleh infrastruktur yang mendasarinya sehingga mereka harus menghabiskan banyak waktu mereproduksi masalah atau mengutak-atik file dan bucket seperti detektif. Di sinilah pemantauan aplikasi yang sebenarnya dapat membantu.
Setiap situasi membutuhkan tingkat visibilitas yang berbeda, dan insinyur data harus memiliki kemampuan untuk mencapai tingkat yang lebih dalam daripada metrik eksekusi. Jika tidak, Anda dapat menghabiskan banyak waktu untuk men-debug masalah kualitas data di Spark.
Dalam panduan ini, Anda akan belajar cara mendapatkan tingkat observabilitas data yang tinggi dan rendah untuk Spark Untuk tingkat tinggi, Anda akan menggunakan sistem internal Spark seperti Listener API dan Query Execution Listener. Untuk tingkat rendah, Anda akan belajar cara menggunakan pustaka untuk melacak metrik kualitas data.
Setelah belajar melakukan keduanya, Anda akan memiliki opsi untuk memilih mana yang paling cocok untuk masalah yang Anda coba selesaikan.
Buletin industri
Ikuti perkembangan tren industri yang paling penting—dan menarik—di bidang AI, otomatisasi, data, dan lainnya dengan buletin Think. Lihat Pernyataan Privasi IBM.
Langganan Anda akan disediakan dalam bahasa Inggris. Anda akan menemukan tautan berhenti berlangganan di setiap buletin. Anda dapat mengelola langganan atau berhenti berlangganan di sini. Lihat Pernyataan Privasi IBM kami untuk informasi lebih lanjut.
Ini adalah cara yang sangat tua dan ampuh untuk mendapatkan metrik. Sebenarnya, UI Spark menggunakan mekanisme yang sama untuk memvisualisasikan metrik. API Listener Spark memungkinkan pengembang untuk melacak peristiwa yang dipancarkan Spark selama eksekusi aplikasi. Acara tersebut biasanya mulai/akhir aplikasi, awal/akhir pekerjaan, awal/akhir tahap, dll. Anda bisa menemukan daftar lengkapnya di Spark JavaDoc. Sangat mudah untuk mengonfigurasi dan mudah menggunakan Spark Listener untuk mengambil metrik. Setelah melakukan setiap operasi, Spark akan memanggil Spark Listener dan meneruskan beberapa informasi metadata ke metodenya. Ini akan mencakup hal-hal seperti waktu eksekusi, catatan baca/ditulis, byte dibaca/ditulis dan lainnya.
Pemantauan kualitas data tingkat rendah dan sangat mendasar ini akan memeriksa jumlah dan ukuran rekaman. Bayangkan Anda memiliki beberapa pekerjaan yang berjalan setiap hari dan menjalankan beberapa transformasi/analitik pada kumpulan data yang masuk. Anda dapat menulis pendengar yang memeriksa berapa banyak catatan yang dibaca dari input dan membandingkannya dengan hasil hari sebelumnya. Ketika perbedaannya signifikan, kita dapat mengasumsikan bahwa ada sesuatu yang salah dengan sumber data.
Namun, pendekatan ini membutuhkan penulisan solusi pemantauan internal. Nilai metrik harus disimpan di suatu tempat, mekanisme peringatan harus dikonfigurasi. Ketika kode aplikasi akan berubah, semua kunci metrik juga akan berubah dan seseorang harus menanganinya dengan benar.
Namun, bahkan Spark Listener sederhana dapat memberikan beberapa insight tentang data Anda.
Berikut adalah contoh Spark Listener yang dimaksud:
public class SomeSparkListener extends SparkListener {
/**
* This very simple spark listener prints metrics collected for every stage.
*
* @param stageCompleted
*/
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
StageInfo stageInfo = stageCompleted.stageInfo();
Iterator it = stageInfo.taskMetrics().accumulators().iterator();
while (it.hasNext()) {
AccumulatorV2 next = it.next();
String key = next.name().get();
Object value = next.value();
System.out.printf("key: %s, value: %s%n", key, value);
}
}
}
Anda dapat menambahkan Spark Listener ke aplikasi Anda dengan beberapa cara:
Tambahkan secara terprogram:
Percikan SparkSession = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());
Atau kirim melalui opsi driver klaster VIA® spark-submit/spark:
spark-Kirim --conf "spark.extraListeners=AI.databand.SomeSparkListener"
Ini adalah mekanisme lain untuk pemantauan Spark yang disediakan di luar kotak. Alih-alih berfokus pada metrik tingkat yang sangat rendah, Query Execution Listener memungkinkan pengembang untuk berlangganan acara penyelesaian kueri. Ini menyediakan metadata tingkat tinggi tentang kueri yang dieksekusi seperti rencana logis dan fisik, dan metrik eksekusi.
Anda bisa mendapatkan metrik seperti catatan dibaca/ditulis oleh kueri, tetapi kali ini digabungkan untuk seluruh kueri, bukannya tugas/pekerjaan/tahapan tertentu.
Juga informasi yang sangat berguna dapat diekstraksi dari rencana seperti lokasi data dan skema. Anda dapat mengekstrak dan toko skema bersama dengan dimensi dataframe dan membandingkannya dengan proses sebelumnya, sehingga memicu peringatan ketika ada sesuatu yang tidak beres.
Namun, mengekstraksi data dari paket bisa rumit karena Anda dipaksa untuk menggunakan API Spark tingkat rendah.
Selain itu, semua beban operasional dengan penerapan mekanisme penyimpanan dan peringatan metrik masih ada. Apa yang akan Anda dapatkan dari Spark hanyalah metadata. Ini adalah tanggung jawab pengembang untuk memanfaatkannya.
Berikut adalah contoh Query Execution Listener sederhana yang mencetak rencana dan metrik:
public class ExampleQueryExecutionListener implements QueryExecutionListener {
/**
* Print plan and query metrics
*
* @param funcName
* @param qe
* @param durationNs
*/
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
System.out.println(qe.executedPlan().prettyJson());
Iterator it = qe.executedPlan().metrics().iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value());
}
}
@Override
public void onFailure(String funcName, QueryExecution qe, Exception exception) {
}
}
Pemroses eksekusi kueri dapat ditambahkan baik secara terprogram atau melalui konfigurasi:
In application code: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());
Via spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"
Menerapkan pemantauan tingkat rendah dapat menjadi beban berat yang serius, namun, cara pemantauan “sistem” memiliki manfaat besar: tidak memperkenalkan overhead komputasi. Karena metadata dipancarkan dan direkam oleh internal Spark, metadata tidak memberikan hukuman apa pun untuk waktu eksekusi kueri.
Menggunakan Listener untuk pemantauan memungkinkan Anda menghindari menyentuh kode aplikasi apa pun. Ini dapat memiliki manfaat besar ketika Anda ingin melacak data pada aplikasi yang ada dan lama tetapi tidak memiliki anggaran untuk membuat perubahan. Cukup tulis pendengar, berikan melalui konfigurasi spark dan dapatkan gambar data Anda.
Anda dapat sangat meningkatkan kepercayaan Anda pada data yang masuk dengan memvalidasinya secara manual. Katakanlah kita mengharapkan sejumlah catatan dalam sumber data input dan jumlah ini biasanya tidak boleh lebih rendah dari X. Kita bisa menulis sesuatu yang sangat sederhana seperti:
df = spark.read("path")
if (df.count < X) {
throw new RuntimeException("Input data is missing")
}
Kemungkinannya di sini tidak terbatas. Kita dapat membandingkan jumlah, jumlah nilai bukan nol, skema yang disimpulkan, dan lain-lain.
Karena banyak pemeriksaan kualitas yang kurang lebih sepele, seperti memastikan bingkai data Anda memiliki bentuk dan isi yang tepat, komunitas mengembangkan pustaka yang mudah digunakan untuk pemeriksaan semacam itu. Salah satu perpustakaan tersebut adalah Deequ. Ini menyediakan bahasa khusus Domain (DSL) yang kaya untuk sebagian besar kasus. Coba lihat. Juga memiliki hal-hal canggih, seperti kemampuan untuk membuat profil kolom, menghitung min/maks/mean/persentil, menghitung histogram, deteksi anomali dan banyak lagi.
Pertimbangkan contoh berikut dari dokumen Deequ:
val verificationResult = VerificationSuite()
.onData(data)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 5) // we expect 5 rows
.isComplete("id") // should never be NULL
.isUnique("id") // should not contain duplicates
.isComplete("productName") // should never be NULL
// should only contain the values "high" and "low"
.isContainedIn("priority", Array("high", "low"))
.isNonNegative("numViews") // should not contain negative values
// at least half of the descriptions should contain a url
.containsURL("description", _ >= 0.5)
// half of the items should have less than 10 views
.hasApproxQuantile("numViews", 0.5, _ <= 10))
.run()
Anda dapat melihat kami memiliki serangkaian cek besar yang dibungkus dengan DSL yang bagus dan siap digunakan.
Lebih penting lagi, Deequ menyediakan kemampuan untuk menyimpan hasil dan secara otomatis menjalankan perbandingan dengan proses sebelumnya. Hal ini dapat dilakukan dengan memanfaatkan Metrics Repositories. Seseorang dapat menulis implementasi mereka sendiri dan mengintegrasikan Deequ dengan lancar ke dalam infrastruktur pemantauan yang ada.
Meskipun pemeriksaan kualitas aplikasi tingkat tinggi jauh lebih fleksibel daripada pendekatan tingkat rendah, mereka datang dengan kelemahan besar: penalti kinerja. Karena setiap perhitungan memancarkan operasi percikan, overhead bisa sangat signifikan dalam beberapa kasus, terutama pada kumpulan data besar. Setiap “hitungan” dan “di mana” dapat mengarah ke pemindaian penuh. Spark secara internal akan melakukan yang terbaik untuk mengoptimalkan rencana eksekusi tetapi Anda harus mempertimbangkan implikasi ini dan memastikan pembuatan profil data tidak akan membahayakan kinerja Anda.
Kami telah melakukan ulasan beberapa cara memantau kualitas data untuk aplikasi Spark. Pendekatan tingkat rendah menggunakan Spark Event Listeners API dan memberikan akses ke metrik tingkat rendah seperti catatan baca/tertulis, rencana logis/fisik dan dapat berguna untuk membangun tren dan memastikan pipeline data menghasilkan hasil yang tepat dan mendapatkan gambaran umum tentang aplikasi yang ada tanpa modifikasi kode apa pun. Pendekatan tingkat tinggi seperti memeriksa data dengan tangan atau menggunakan pustaka kualitas data jauh lebih nyaman tetapi memiliki kelemahan seperti penalti kinerja.
Seperti dalam situasi dunia nyata, selalu ada pertukaran dan skenario yang lebih baik untuk kedua pendekatan, tergantung pada jenis aplikasi Anda. Gunakan dengan bijak.
Di IBM® Databand, kami menggunakan kedua cara untuk menyediakan serangkaian opsi komprehensif untuk melacak aplikasi Spark. Sementara pada inti kami, kami menggunakan Spark Listener untuk membangun tren metrik dan garis keturunan data, kami juga menyediakan Metrics Store yang nyaman untuk Deequ serta kemampuan untuk melacak metrik individual yang dihitung secara manual.
Pelajari lebih lanjut platform pengamatan data berkelanjutan dari Databand dan bagaimana platform ini membantu mendeteksi insiden data lebih awal, menyelesaikannya lebih cepat, dan memberikan data yang lebih tepercaya kepada bisnis. Jika Anda siap untuk melihat lebih dalam, pesan demo hari ini.
Percepat identifikasi insiden hybrid dengan analitik operasional nyaris seketika.
Buka hasil dengan solusi analitik cloud yang memungkinkan Anda menganalisis data dengan mudah dan membangun model machine learning.
Temukan kemampuan baru dan dorong ketangkasan bisnis dengan layanan konsultasi cloud IBM.