本篇文章為大家展示了如何進行Spark Shuffle實現,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
對于大數據計算框架而言,Shuffle階段的設計優劣是決定性能好壞的關鍵因素之一。小編將介紹目前Spark的shuffle實現,并將之與MapReduce進行簡單對比。
(1) shuffle基本概念與常見實現方式
shuffle,是一個算子,表達的是多對多的依賴關系,在類MapReduce計算框架中,是連接Map階段和Reduce階段的紐帶,即每個Reduce Task從每個Map Task產生數的據中讀取一片數據,極限情況下可能觸發M*R個數據拷貝通道(M是Map Task數目,R是Reduce Task數目)。通常shuffle分為兩部分:Map階段的數據準備和Reduce階段的數據拷貝。首先,Map階段需根據Reduce階段的Task數量決定每個Map Task輸出的數據分片數目,有多種方式存放這些數據分片:
1) 保存在內存中或者磁盤上(Spark和MapReduce都存放在磁盤上);
2) 每個分片一個文件(現在Spark采用的方式,若干年前MapReduce采用的方式),或者所有分片放到一個數據文件中,外加一個索引文件記錄每個分片在數據文件中的偏移量(現在MapReduce采用的方式)。
在Map端,不同的數據存放方式各有優缺點和適用場景。一般而言,shuffle在Map端的數據要存儲到磁盤上,以防止容錯觸發重算帶來的龐大開銷(如果保存到Reduce端內存中,一旦Reduce Task掛掉了,所有Map Task需要重算)。但數據在磁盤上存放方式有多種可選方案,在MapReduce前期設計中,采用了現在Spark的方案(目前一直在改進),每個Map Task為每個Reduce Task產生一個文件,該文件只保存特定Reduce Task需處理的數據,這樣會產生M*R個文件,如果M和R非常龐大,比如均為1000,則會產生100w個文件,產生和讀取這些文件會產生大量的隨機IO,效率非常低下。解決這個問題的一種直觀方法是減少文件數目,常用的方法有:
1) 將一個節點上所有Map產生的文件合并成一個大文件(MapReduce現在采用的方案),
2) 每個節點產生{(slot數目)*R}個文件(Spark優化后的方案)。對后面這種方案簡單解釋一下:不管是MapReduce 1.0還是Spark,每個節點的資源會被抽象成若干個slot,由于一個Task占用一個slot,因此slot數目可看成是最多同時運行的Task數目。如果一個Job的Task數目非常多,限于slot數目有限,可能需要運行若干輪。這樣,只需要由第一輪產生{(slot數目)*R}個文件,后續幾輪產生的數據追加到這些文件末尾即可。
因此,后一種方案可減少大作業產生的文件數目。
在Reduce端,各個Task會并發啟動多個線程同時從多個Map Task端拉取數據。由于Reduce階段的主要任務是對數據進行按組規約。
也就是說,需要將數據分成若干組,以便以組為單位進行處理。大家知道,分組的方式非常多,常見的有:Map/HashTable(key相同的,放到同一個value list中)和Sort(按key進行排序,key相同的一組,經排序后會挨在一起),這兩種方式各有優缺點,第一種復雜度低,效率高,但是需要將數據全部放到內存中,第二種方案復雜度高,但能夠借助磁盤(外部排序)處理龐大的數據集。Spark前期采用了第一種方案,而在最新的版本中加入了第二種方案, MapReduce則從一開始就選用了基于sort的方案。
(2) MapReduce Shuffle發展史
【階段1】:MapReduce Shuffle的發展也并不是一馬平川的,剛開始(0.10.0版本之前)采用了“每個Map Task產生R個文件”的方案,前面提到,該方案會產生大量的隨機讀寫IO,對于大數據處理而言,非常不利。
【階段2】:為了避免Map Task產生大量文件,HADOOP-331嘗試對該方案進行優化,優化方法:為每個Map Task提供一個環形buffer,一旦buffer滿了后,則將內存數據spill到磁盤上(外加一個索引文件,保存每個partition的偏移量),最終合并產生的這些spill文件,同時創建一個索引文件,保存每個partition的偏移量。
(階段2):這個階段并沒有對shuffle架構做調成,只是對shuffle的環形buffer進行了優化。在Hadoop 2.0版本之前,對MapReduce作業進行參數調優時,Map階段的buffer調優非常復雜的,涉及到多個參數,這是由于buffer被切分成兩部分使用:一部分保存索引(比如parition、key和value偏移量和長度),一部分保存實際的數據,這兩段buffer均會影響spill文件數目,因此,需要根據數據特點對多個參數進行調優,非常繁瑣。而MAPREDUCE-64則解決了該問題,該方案讓索引和數據共享一個環形緩沖區,不再將其分成兩部分獨立使用,這樣只需設置一個參數控制spill頻率。
【階段3(進行中)】:目前shuffle被當做一個子階段被嵌到Reduce階段中的。由于MapReduce模型中,Map Task和Reduce Task可以同時運行,因此一個作業前期啟動的Reduce Task將一直處于shuffle階段,直到所有Map Task運行完成,而在這個過程中,Reduce Task占用著資源,但這部分資源利用率非常低,基本上只使用了IO資源。為了提高資源利用率,一種非常好的方法是將shuffle從Reduce階段中獨立處理,變成一個獨立的階段/服務,由專門的shuffler service負責數據拷貝,目前百度已經實現了該功能(準備開源?),且收益明顯,具體參考:MAPREDUCE-2354。
(3) Spark Shuffle發展史
目前看來,Spark Shuffle的發展史與MapReduce發展史非常類似。初期Spark在Map階段采用了“每個Map Task產生R個文件”的方法,在Reduce階段采用了map分組方法,但隨Spark變得流行,用戶逐漸發現這種方案在處理大數據時存在嚴重瓶頸問題,因此嘗試對Spark進行優化和改進,相關鏈接有:External Sorting for Aggregator and CoGroupedRDDs,“Optimizing Shuffle Performance in Spark”,“Consolidating Shuffle Files in Spark”,優化動機和思路與MapReduce非常類似。
Spark在前期設計中過多依賴于內存,使得一些運行在MapReduce之上的大作業難以直接運行在Spark之上(可能遇到OOM問題)。目前Spark在處理大數據集方面尚不完善,用戶需根據作業特點選擇性的將一部分作業遷移到Spark上,而不是整體遷移。隨著Spark的完善,很多內部關鍵模塊的設計思路將變得與MapReduce升級版Tez非常類似。
上述內容就是如何進行Spark Shuffle實現,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。