Trik Meningkatkan Kinerja Spark Pipeline


Spark adalah pilihan populer untuk pekerjaan rekayasa data karena Spark menjanjikan kekuatan pemrosesan yang sangat cepat pada volume data terdistribusi yang besar. Seperti permainan antara kucing dan tikus kebutuhan yang terus meningkat untuk pemrosesan data yang lebih dalam masih dapat membanjiri Spark membuat pipeline Spark memakan waktu dan sulit untuk dikelola. Pekerjaan harian melibatkan pemrosesan data PT, dan tim kami mengelola pipeline Spark setiap malam di AWS EMR untuk tujuan itu. Setiap beberapa bulan membutuhkan penyetelan performa Spark. Dengan ratusan kenop untuk diputar itu selalu merupakan perjuangan berat untuk memeras lebih banyak dari saluran pipa Spark. Di blog ini saya ingin menyoroti 3 metode yang diabaikan untuk mengoptimalkan pipa Spark yaitu: 

1. merapikan keluaran pipa 

2. menyeimbangkan beban kerja melalui pengacakan

3. ganti join dengan fungsi window


Seperti apa pipeline Spark saya?

Sebelum saya membahas 3 metode saya ingin menjelaskan pipeline Spark saya terlebih dahulu. Pipeline Spark saya mencakup ribuan aplikasi Spark independen. Setiap aplikasi Spark memasukkan data pelanggan dalam S3, membuat rantai transformasi yang panjang dan akhirnya menulis keluaran ke S3. Aplikasi Spark ini semuanya ada di Pyspark di AWS EMR. Kami secara bersamaan menjalankan ratusan aplikasi Spark dari ribuan ini. Pada hari-hari buruk, dibutuhkan waktu hingga 24 jam untuk seluruh pekerjaan. Penting untuk ditunjukkan bahwa data satu pelanggan bisa jauh lebih besar daripada data pelanggan lain oleh karena itu aplikasi Spark independen ini sebenarnya memiliki jejak sumber daya yang berbeda.

1. Merapikan Keluaran Pipa

Strategi paling terkenal nomor 1 untuk pengoptimalan performa adalah mengurangi ukuran data dalam aplikasi Anda yaitu hanya menyerap data mentah yang diperlukan dan menjaga agar bingkai data perantara tetap kompak. Dalam 1 sprint penyetelan kinerja, mengganti salah satu sumber data mentah kami dengan sumber lain yang memiliki perincian lebih rendah kita melihat peningkatan kinerja 50% dengan mudah. Jika sumber data alternatif tidak tersedia Anda bisa menggunakan input = spark.read.parquet("fs://path/file.parquet").select(...)untuk membatasi membaca hanya ke kolom yang berguna. Membaca lebih sedikit data ke dalam memori akan mempercepat aplikasi Anda.

Harus sama jelasnya bahwa menulis lebih sedikit keluaran ke direktori tujuan juga meningkatkan kinerja dengan mudah. Karena langkah terakhir sering kali kurang diperhatikan langkah terakhir di mana Anda menulis keluaran kemungkinan besar bukan yang paling ringkas. Dalam pipeline data kami sangat liberal terhadap keluaran kami. Ada kolom dan baris yang hanya informatif terbaik yaitu kami membuat banyak data yang tidak perlu. Dalam 1 sprint penyetelan kinerja kita mengurangi ukuran keluaran kami (sebesar 40%) dan saya melihat peningkatan kinerja selama 2 jam. Jika Anda mempertimbangkan biaya penyimpanan data dan biaya proses hilir untuk menangani baris yang tidak perlu, merapikan keluaran pipeline Anda akan menyebarkan manfaat kinerja ke seluruh sistem dan perusahaan. Jangan lupa kita menang mudah di sana.

2. Seimbangkan Beban Kerja melalui Pengacakan

Jumlah inti untuk pelaksana, ukuran memori untuk pelaksana adalah hal yang paling dasar untuk manajemen memori Spark. Ada daftar panjang parameter yaitu ukuran memori overhead, nomor partisi default, dan banyak lagi. Anda dapat merujuk ke dua dokumen ini untuk mengetahui detailnya yaitu spark.org , AWS EMRUntuk menemukan konfigurasi resource yang baik untuk aplikasi Spark Anda memerlukan pengetahuan dan pengalaman teknik yang solid. 1 wawasan yang saya pelajari dari bermain dengan kenop ini adalah bahwa jaringan membunuh kinerja. Dengan mengurangi spark.sql.shuffle.partitions kita pernah melihat peningkatan kinerja selama 4 jam. Meskipun partisi tinggi meningkatkan ketahanan, partisi rendah memungkinkan cluster menghindari pengiriman data sehingga menghemat sumber daya jaringan untuk kalkulasi dalam memori.

Sebagian besar tulisan tentang penyetelan kinerja Spark menyangkut 1 aplikasi namun kami biasanya tidak terlalu peduli dengan kinerja aplikasi individu. Di pipeline Spark kami memiliki ribuan aplikasi Spark, kami ingin total run time tetap rendah, kami tidak ingin menyetel dan menetapkan konfigurasi resource yang sempurna untuk setiap aplikasi. Tanpa konfigurasi resource per aplikasi, kami membuat 3 bucket yaitu kecil, sedang, dan besar. Pelanggan dengan volume data besar mendapatkan konfigurasi memori yang besar, pelanggan dengan volume data kecil mendapatkan konfigurasi memori yang kecil. Dengan “ukuran” aplikasi yang berbeda dalam pipeline, kami mengalami masalah seperti pipeline long-tail (1 aplikasi terakhir membutuhkan waktu lebih lama), aplikasi yang lebih besar memblokir aplikasi yang lebih kecil dan masalah lainnya. Meskipun kami dapat menggunakan teknik yang rumit untuk mengatasi masalah ini, saya menemukan pengacakan sebagai cara yang lebih cepat untuk dilakukan. Dalam 1 sprint penyetelan kinerja kita hanya mengacak ribuan aplikasi dan mengamati peningkatan kinerja 4 jam. Pengacakan aplikasi tidak hanya membuat beban kerja terdistribusi secara merata (penyediaan sumber daya tingkat cluster yang lebih mudah) tetapi juga mencampurkan berbagai ukuran aplikasi secara bersama-sama (kurang idleness).

3. Ganti Gabungan dengan Fungsi Jendela

Di dalam pipeline Spark kami, kami memanipulasi data input mentah dengan membuat dan menambahkan kolom baru. Dalam proses ini kami mau tidak mau membuat bingkai data masukan lebih besar dan membuat bingkai data perantara. withColumn()adalah fungsi pyspark.sql umum yang kami gunakan untuk membuat kolom baru di sini tautan ke dokumen Spark resminya. Kami membuat tabel perantara karena kami menangani logika bisnis yang berbeda di setiap tabel perantara kemudian di langkah selanjutnya kami akan menggabungkan tabel untuk mendapatkan tabel akhir. Pola ini semuanya bagus dan familier. Namun ketika rantai manipulasi data tumbuh semakin lama penggabungan sederhana antara tabel perantara dalam proses selanjutnya dapat dengan mudah membuat kesalahan memori. Teknik yang umum seperti persist()menyimpan data perantara ke dalam cache bahkan tidak membantu.

Dalam satu sprint penyetelan kinerja kita memutuskan untuk menghindari penggabungan karena masalah memori yang konsisten. Saya malah menggunakan fungsi Window untuk membuat kolom baru yang seharusnya saya capai melalui join. Kita tidak hanya menghindari masalah memori tetapi juga saya melihat peningkatan kinerja 33%. Gabungan melibatkan 2 tabel namun fungsi Window hanya melibatkan beberapa kolom. Tidak perlu dikatakan lagi menulis fungsi Window tidak sesederhana gabungan tetapi mengganti join dengan fungsi Window tentu membantu menyederhanakan grafik komputasi di balik layar.

Spark adalah pilihan populer untuk pekerjaan rekayasa data tetapi penyetelan kinerja Spark adalah masalah terbesar untuk pekerjaan data yang serius. Saya berharap Spark akan menangani lebih banyak penyetelannya secara otomatis di masa mendatang dan selalu selangkah lebih maju dari kebutuhan pemrosesan data yang terus berkembang. Sementara itu kita berharap trik ini mudah dipahami dan bermanfaat untuk Anda gunakan.

Comments

Popular Posts