這篇文章將為大家詳細講解有關怎么進行spark.streaming.concurrentJobs參數解密的分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
最近,在spark streaming 調優時,發現個增加job并行度的參數spark.streaming.concurrentJobs
,spark 默認值為1,當增加為2時(在spark-default中配置),如遇到處理速度慢 streaming application UI 中會有兩個Active Jobs(默認值時為1),也就是在同一時刻可以執行兩個批次的streaming job,下文分析這個參數是如何影響streaming 的執行的。 ##參數引入 在spark streaming 的JobScheduler line 47,讀取了該參數:
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
使用concurrentJobs參數初始化jobExecutor線程池,也就是這個參數直接影響了job executor線程池中的線程數目。
job executor 線程池用來execute JobHandler線程;在jobSchedule中有個job容器jobSets:
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
用來保存不同的時間點生成的JobSet,而JobSet中包含多個Job; JobSet submit邏輯:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
不難看出jobExecutor的容量決定了池子中同時可以被處理的JobHandler線程數,JobHandler是job的執行線程,因此決定了可以被同時被提交的Job數目。
可以通過集中方法為streaming job配置此參數。
spark-default中修改 全局性修改,所有的streaming job都會受到影響。
提交streaming job是 --conf 參數添加(推薦) 在提交job時,可以使用--conf 參數為該job添加個性化的配置。例如: bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5
設置該streaming job的job executor 線程池大小為5,在資源充足的情況下可以同時執行5個batch job。
代碼設置 在代碼中通過sparkConf設置: sparkConf.set("spark.streaming.concurrentJobs", "5");
或者 System.setProperty("spark.streaming.concurrentJobs", "5");
在配置多個concurrentJob時,多個批次job被同時提交到集群中,也就需要更多的計算資源;當沒有更多的計算資源(Executor)被分配個該streaming job時,可將schedul 調整為FAIR(公平調度)來達到被提交的多個job可公平的共享計算資源。 當調整為公平調度時,job可以共享計算資源,而job的提交仍然是有時間順序的(雖然時間間隔很?。?,容易造成task在executor間分配的傾斜,拉長job的整體執行時間。 當使用fifo調度方式,先到的job優先獲得計算資源,當executor數目不足時,job會等待executor被釋放,task數目反而不易傾斜。 在實際使用時,如果executor數目足夠,建議使用FIFO模式,如在concurrentJob為默認配置時,executor分配數目為m,則當concurrentJobs配置為n時,executor建議分配為 n*m。
關于怎么進行spark.streaming.concurrentJobs參數解密的分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。