> Source: Pixabay
Apache Spark是一種開放源代碼的分布式計(jì)算引擎,目前是用于內(nèi)存中批處理驅(qū)動(dòng)的數(shù)據(jù)處理的最受歡迎的框架(它還支持實(shí)時(shí)數(shù)據(jù)流傳輸)。 得益于其先進(jìn)的查詢優(yōu)化器,DAG調(diào)度程序和執(zhí)行引擎,Spark能夠非常高效地處理和分析大型數(shù)據(jù)集。 但是,在沒有仔細(xì)調(diào)整的情況下運(yùn)行Spark作業(yè)仍會(huì)導(dǎo)致性能下降。
在博客文章中,我將分享一些Spark性能調(diào)優(yōu)的技巧,以幫助您解決和加快運(yùn)行緩慢的Spark作業(yè)。
(本文提到的所有功能均來自PySpark,您可以使用Spark API文檔找到與Scala / JAVA等效的功能。)
分區(qū)不均勻
當(dāng)數(shù)據(jù)集最初由Spark加載并成為彈性分布式數(shù)據(jù)集(RDD)時(shí),所有數(shù)據(jù)均勻地分布在分區(qū)之間。 但是,在用戶對其應(yīng)用某些類型的數(shù)據(jù)操作之后,這些分區(qū)可能會(huì)變得不均勻。 例如,groupByKey操作可能導(dǎo)致分區(qū)偏斜,因?yàn)橐粋€(gè)鍵可能比另一個(gè)鍵包含更多的記錄。 此外,由于Spark的DataFrameWriter允許使用partitionBy將分區(qū)數(shù)據(jù)寫入磁盤,因此磁盤上的分區(qū)也可能不均勻。
在DataFrame中重新平衡偏斜的分區(qū)將極大地提高Spark在DataFrame上的處理性能。 您可以使用getNumPartitions函數(shù)檢查DataFrame中的分區(qū)數(shù),并通過運(yùn)行簡單的Spark作業(yè)來查找每個(gè)分區(qū)中的記錄數(shù),例如:
from pyspark.sql.functions import spark_partition_id
df.withColumn("partition_id", spark_partition_id())
.groupBy("partition_id")
.count()
.show()
如果發(fā)現(xiàn)DataFrame的分區(qū)大小高度不均勻,請?jiān)趯λM(jìn)行任何分析之前,使用重新分區(qū)或合并函數(shù)對DataFrame進(jìn)行重新分區(qū)。 還建議在將數(shù)據(jù)寫回磁盤之前,先對內(nèi)存中的數(shù)據(jù)進(jìn)行分區(qū)。 RDD模塊也支持這些重新分區(qū)功能。
堅(jiān)持RDD的缺點(diǎn)
由于惰性執(zhí)行原理,除非用戶明確調(diào)用操作來收集結(jié)果,否則Spark不會(huì)對數(shù)據(jù)集執(zhí)行任何實(shí)際的轉(zhuǎn)換。 此外,如果用戶希望對中間結(jié)果應(yīng)用其他轉(zhuǎn)換,Spark將需要從頭開始重新計(jì)算所有內(nèi)容。 為了允許用戶更有效地重用日期,Spark可以使用持久性或緩存功能將數(shù)據(jù)緩存在內(nèi)存和/或磁盤中。
但是,緩存并不總是一個(gè)好主意。 Spark緩存數(shù)據(jù)集后,Catalyst優(yōu)化器優(yōu)化進(jìn)一步轉(zhuǎn)換的能力將受到限制,因?yàn)樗辉倌軌蚋纳圃磾?shù)據(jù)級(jí)別的修剪。 例如,如果將過濾器應(yīng)用于在源數(shù)據(jù)庫中建立索引的列,則Catalyst將無法利用索引來提高性能。
因此,僅當(dāng)緩存數(shù)據(jù)將在以后多次重用時(shí)才建議使用緩存數(shù)據(jù)。 迭代探索數(shù)據(jù)集或調(diào)整ML模型時(shí)。

> Source: Pixabay
基于成本的優(yōu)化器(CBO)
基于成本的優(yōu)化器(CBO)通過向Catalyst提供其他表級(jí)統(tǒng)計(jì)信息,可以加快Spark SQL作業(yè)的速度,這對于連接許多數(shù)據(jù)集的作業(yè)特別有用。 使用者可以通過將spark.sql.cbo.enabled設(shè)置為true(默認(rèn)值)來啟用CBO。
為了充分利用CBO,用戶需要保持列級(jí)和表級(jí)統(tǒng)計(jì)信息都是最新的,從而使CBO可以使用準(zhǔn)確的估算來優(yōu)化查詢計(jì)劃。 為此,在對表運(yùn)行SQL查詢之前,請使用ANALYZE TABLE命令收集統(tǒng)計(jì)信息。 記住在修改表之后再次分析表,以確保統(tǒng)計(jì)信息是最新的。
廣播Join
除了啟用CBO,在Spark中優(yōu)化連接數(shù)據(jù)集的另一種方法是使用廣播聯(lián)接。 在無序連接中,兩個(gè)表中的記錄都將通過網(wǎng)絡(luò)傳輸給執(zhí)行器,當(dāng)一個(gè)表比另一個(gè)大得多時(shí),這是次優(yōu)的。 在廣播聯(lián)接中,較小的表將被發(fā)送給執(zhí)行程序,以與較大的表聯(lián)接,從而避免了通過網(wǎng)絡(luò)發(fā)送大量數(shù)據(jù)的情況。
用戶可以通過spark.sql.autoBroadcastJoinThreshold配置控制廣播聯(lián)接,指示要廣播的表的最大大小。 此外,即使表的大小大于spark.sql.autoBroadcastJoinThreshold,也可以使用廣播提示來告訴Spark廣播表:
from pyspark.sql.functions import broadcast
broadcast(spark.table("tbl_a")).join(spark.table("tbl_b"), "key")
垃圾收集(GC)
由于所有Spark作業(yè)都占用大量內(nèi)存,因此確保有效進(jìn)行垃圾收集非常重要-我們希望產(chǎn)生較少的內(nèi)存"垃圾"以減少GC時(shí)間。 要了解您的Spark作業(yè)是否在GC中花費(fèi)過多時(shí)間,請?jiān)赟park UI中檢查"任務(wù)反序列化時(shí)間"和" GC時(shí)間"。
例如,由于Spark需要反序列化更多對象,因此使用用戶定義函數(shù)(UDF)和lambda函數(shù)將導(dǎo)致更長的GC時(shí)間。 還建議避免創(chuàng)建中間對象并將不必要的RDD緩存到JVM堆。
TL; DR:
· 使用重新分區(qū)或合并來重新平衡不均勻的分區(qū)。
· 僅當(dāng)數(shù)據(jù)將被多次重用時(shí)才保留數(shù)據(jù)。
· 使用ANALYZE TABLE命令可以維護(hù)CBO的最新統(tǒng)計(jì)信息。
· 為小表啟用廣播連接以加快連接速度。
· 通過使用較少的UDF并避免緩存大對象來優(yōu)化GC。
(本文翻譯自Xinran Waibel的文章《Apache Spark Optimization Toolkit》,參考:https://towardsdatascience.com/apache-spark-optimization-toolkit-17cf3e491992)