Spark2.x中如何進行Shuffle相關參數優化,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
一、概述
這里也結合網絡上的一些調優建議,把整個Shuffle過程常用的優化參數進行了整理。注意這里是基于Spark2.x版本的優化。
二、相關參數及優化建議
1.spark.shuffle.file.buffer
默認值:
32KB
參數說明:
該參數用于設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖區中,待緩沖區寫滿之后,才會溢寫到磁盤。
調優建議:
如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64KB),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
2.spark.reducer.maxSizeInFlight
默認值:
48MB
參數說明:
該參數用于設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。
調優建議:
如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大?。ū热?6MB),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
常見問題:shuffle read task階段出現OOM
shuffle read task去拉取shuffle writer task輸出的數據,它是一邊拉取一邊進行聚合,shuffle read task有一塊聚合內存,大小為executor memory * spark.shuffle.memoryFraction。
解決辦法:
a.增加reduce 聚合的內存的比例,設置spark.shuffle.memoryFraction 0.3
b.增加executor memory的大小 --executor-memory 4G
c.減少reduce task每次拉取的數據量 設置spark.reducer.maxSizeInFlight 24MB
3.spark.shuffle.io.maxRetyies
默認值:
3次
參數說明:
shuffle read task訪問shuffle write task的重試次數,如果超過該次數,read端的task就會被kill掉。
調優建議:
調大該數值,提高重試次數,防止write端在進行full gc或其他操作read被kill影響整個程序的正常執行
4.spark.shuffle.io.retryWait
默認值:
5s
參數說明:
shuffle read task訪問shuffle write task數據的等待時間間隔
調優建議:
調大改參數,效果同參數3
5.spark.shuffle.memoryFraction
默認值:
0.2
參數說明:
該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例,默認是20%。
調優建議:
如果聚合內存充足,而且很少使用持久化操作建議調高這個比例,給shuffle read的聚合操作更多內存,以避免由于內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升10%左右。
6.spark.shuffle.sort.bypassMergeThreshold
默認值:
200
參數說明:
當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小于這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創建單獨的索引文件。
調優建議:
當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于shuffle read task的數量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高
7.spark.shuffle.consolidateFiles(spark2.x不需要,只spark1.x需要)
默認值:
false
參數說明:
如果使用HashShuffleManager,該參數有效。如果設置為true,那么就會開啟consolidate機制,會大幅度合并shuffle write的輸出文件,對于shuffle read task數量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。
調優建議:
如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制,還可以嘗試將spark.shffle.manager參數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發現其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。
8.spark.shuffle.compress和 spark.shuffle.spill.compress
默認值
這兩個參數的默認配置都是true。
參數說明:
spark.shuffle.compress和spark.shuffle.spill.compress都是用來設置Shuffle過程中是否對Shuffle數據進行壓縮;其中前者針對最終寫入本地文件系統的輸出文件,后者針對在處理過程需要spill到外部存儲的中間數據,后者針對最終的shuffle輸出文件。
調優建議:
對于參數spark.shuffle.compress,如果下游的Task通過網絡獲取上游Shuffle Map Task的結果的網絡IO成為瓶頸,那么就需要考慮將它設置為true:通過壓縮數據來減少網絡IO。由于上游Shuffle Map Task和下游的Task現階段是不會并行處理的,即上游Shuffle Map Task處理完成,然后下游的Task才會開始執行。因此如果需要壓縮的時間消耗就是Shuffle MapTask壓縮數據的時間 + 網絡傳輸的時間 + 下游Task解壓的時間;而不需要壓縮的時間消耗僅僅是網絡傳輸的時間。因此需要評估壓縮解壓時間帶來的時間消耗和因為數據壓縮帶來的時間節省。如果網絡成為瓶頸,比如集群普遍使用的是千兆網絡,那么可能將這個選項設置為true是合理的;如果計算是CPU密集型的,那么可能將這個選項設置為false才更好。
對于參數spark.shuffle.spill.compress,如果設置為true,代表處理的中間結果在spill到本地硬盤時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。因此要綜合考慮CPU由于引入壓縮解壓的消耗時間和Disk IO因為壓縮帶來的節省時間的比較。在Disk IO成為瓶頸的場景下,這個被設置為true可能比較合適;如果本地硬盤是SSD,那么這個設置為false可能比較合適。
9.spark.shuffle.service.enabled
默認值
false
參數說明:
是否啟用External shuffle Service服務,Spark系統在運行含shuffle過程的應用時,Executor進程除了運行task,還要負責寫shuffle數據,給其他Executor提供shuffle數據。當Executor進程任務過重,導致GC而不能為其他Executor提供shuffle數據時,會影響任務運行。External shuffle Service是長期存在于NodeManager進程中的一個輔助服務。通過該服務來抓取shuffle數據,減少了Executor的壓力,在Executor GC的時候也不會影響其他Executor的任務運行
優化建議:
啟用外部shuffle服務,這個服務會安全地保存shuffle過程中,executor寫的磁盤文件,因此executor即使掛掉也不要緊,必須配合spark.dynamicAllocation.enabled屬性設置為true,才能生效,而且外部shuffle服務必須進行安裝和啟動,才能啟用這個屬性。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。