線程池
“線程池”,顧名思義就是一個線程緩存,線程是稀缺資源,如果被無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,因此Java中提供線程池對線程進行統一分配、調優和監控。
線程池介紹
在web開發中,服務器需要接受并處理請求,所以會為一個請求來分配一個線程來進行處理。如果每次請求都新創建一個線程的話實現起來非常簡便,但是存在一個問題:
如果并發的請求數量非常多,但每個線程執行的時間很短,這樣就會頻繁的創建和銷毀線程,如此一來會大大降低系統的效率??赡艹霈F服務器在為每個請求創建新線程和銷毀線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。
那么有沒有一種辦法使執行完一個任務,并不被銷毀,而是可以繼續執行其他的任務呢?
這就是線程池的目的了。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。
什么時候使用線程池?
單個任務處理時間比較短
需要處理的任務數量很大
線程池優勢
重用存在的線程,減少線程創建,消亡的開銷,提高性能
提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
線程的實現方式
Runnable,Thread,Callable //?實現Runnable接口的類將被Thread執行,表示一個基本的任務 public?interface?Runnable?{ //?run方法就是它所有的內容,就是實際執行的任務 public?abstract?void?run(); } //Callable同樣是任務,與Runnable接口的區別在于它接收泛型,同時它執行任務后帶有返回內容 public?interface?Callable<V>?{ //?相對于run方法的帶有返回值的call方法 V?call()?throws?Exception; }
Executor框架
Executor接口是線程池框架中最基礎的部分,定義了一個用于執行Runnable的execute方法。
下圖為它的繼承與實現
從圖中可以看出Executor下有一個重要子接口ExecutorService,其中定義了線程池的具體行為
1,execute(Runnable command):履行Ruannable類型的任務,
2,submit(task):可用來提交Callable或Runnable任務,并返回代表此任務的Future對象
3,shutdown():在完成已提交的任務后封閉辦事,不再接管新任務,
4,shutdownNow():停止所有正在履行的任務并封閉辦事。
5,isTerminated():測試是否所有任務都履行完畢了。
6,isShutdown():測試是否該ExecutorService已被關閉。
線程池重點屬性
private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0)); private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3; private?static?final?int?CAPACITY?=?(1?<<?COUNT_BITS)?-?1;
ctl 是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。
ctl相關方法
private?static?int?runStateOf(int?c)?{?return?c?&?~CAPACITY;?} private?static?int?workerCountOf(int?c)?{?return?c?&?CAPACITY;?} private?static?int?ctlOf(int?rs,?int?wc)?{?return?rs?|?wc;?}
runStateOf:獲取運行狀態;
workerCountOf:獲取活動線程數;
ctlOf:獲取運行狀態和活動線程數的值。
線程池存在5種狀態
RUNNING?=?-1?<<?COUNT_BITS;?//高3位為111 SHUTDOWN?=?0?<<?COUNT_BITS;?//高3位為000 STOP?=?1?<<?COUNT_BITS;?//高3位為001 TIDYING?=?2?<<?COUNT_BITS;?//高3位為010 TERMINATED?=?3?<<?COUNT_BITS;?//高3位為011
1、RUNNING
(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態,并且線程池中的任務數為0!
2、 SHUTDOWN
(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。
(2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空并且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5、 TERMINATED
(1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED。
進入TERMINATED的條件如下:
線程池不是RUNNING狀態;
線程池狀態不是TIDYING狀態或TERMINATED狀態;
如果線程池狀態是SHUTDOWN并且workerQueue為空;
workerCount為0;
設置TIDYING狀態成功。
線程池的具體實現
ThreadPoolExecutor 默認線程池
ScheduledThreadPoolExecutor 定時線程池
ThreadPoolExecutor線程池的創建
public?ThreadPoolExecutor(int?corePoolSize, ??????????????????????????int?maximumPoolSize, ??????????????????????????long?keepAliveTime, ??????????????????????????TimeUnit?unit, ??????????????????????????BlockingQueue<Runnable>?workQueue, ??????????????????????????ThreadFactory?threadFactory, ??????????????????????????RejectedExecutionHandler?handler)
任務提交
1、public?void?execute()?//提交任務無返回值 2、public?Future<?>?submit()?//任務執行完成后有返回值
參數解釋
corePoolSize
線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等于corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建并啟動所有核心線程。
maximumPoolSize
線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小于maximumPoolSize;
keepAliveTime
線程池維護線程所允許的空閑時間。當線程池中的線程數量大于corePoolSize的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
unit
keepAliveTime的單位;
workQueue
用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口,在JDK中提供了如下阻塞隊列:
1、ArrayBlockingQueue:基于數組結構的有界阻塞隊列,按FIFO排序任務;
2、LinkedBlockingQuene:基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優先級的***阻塞隊列;
threadFactory
它是ThreadFactory類型的變量,用來創建新線程。默認使用Executors.defaultThreadFactory() 來創建線程。使用默認的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先級并且是非守護線程,同時也設置了線程的名稱。歡迎大家關注我的公種浩【程序員追風】,整理了2019年多家公司java面試題資料100多頁pdf文檔,文章都會在里面更新,整理的資料也會放在里面。
handler
線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
1、AbortPolicy:直接拋出異常,默認策略;
2、CallerRunsPolicy:用調用者所在的線程來執行任務;
3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務;
4、DiscardPolicy:直接丟棄任務;
上面的4種策略都是ThreadPoolExecutor的內部類。
當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。
線程池監控
public?long?getTaskCount()?//線程池已執行與未執行的任務總數 public?long?getCompletedTaskCount()?//已完成的任務數 public?int?getPoolSize()?//線程池當前的線程數 public?int?getActiveCount()?//線程池中正在執行任務的線程數量
線程池原理
源碼分析
execute方法
public?void?execute(Runnable?command)?{ ????if?(command?==?null) ????????throw?new?NullPointerException(); /* ?*?clt記錄著runState和workerCount ?*/ ????int?c?=?ctl.get(); /* ?*?workerCountOf方法取出低29位的值,表示當前活動的線程數; ?*?如果當前活動線程數小于corePoolSize,則新建一個線程放入線程池中; ?*?并把任務添加到該線程中。 ?*/ ????if?(workerCountOf(c)?<?corePoolSize)?{ ????????/* ?????????*?addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷; ?????????*?如果為true,根據corePoolSize來判斷; ?????????*?如果為false,則根據maximumPoolSize來判斷 ?????????*/ ????????if?(addWorker(command,?true)) ????????????return; /* ?*?如果添加失敗,則重新獲取ctl值 ?*/ ????????c?=?ctl.get(); ????} /* ?*?如果當前線程池是運行狀態并且任務添加到隊列成功 ?*/ ????if?(isRunning(c)?&&?workQueue.offer(command))?{ ????????//?重新獲取ctl值 ????????int?recheck?=?ctl.get(); ?????????//?再次判斷線程池的運行狀態,如果不是運行狀態,由于之前已經把command添加到workQueue中了, ????????//?這時需要移除該command ????????//?執行過后通過handler使用拒絕策略對該任務進行處理,整個方法返回 ????????if?(!?isRunning(recheck)?&&?remove(command)) ????????????reject(command); ????????/* ?????????*?獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法 ?????????*?這里傳入的參數表示: ?????????*?1.?第一個參數為null,表示在線程池中創建一個線程,但不去啟動; ?????????*?2.?第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize,添加線程時根據maximumPoolSize來判斷; ?????????*?如果判斷workerCount大于0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執行。 ?????????*/ ????????else?if?(workerCountOf(recheck)?==?0) ????????????addWorker(null,?false); ????} /* ?*?如果執行到這里,有兩種情況: ?*?1.?線程池已經不是RUNNING狀態; ?*?2.?線程池是RUNNING狀態,但workerCount?>=?corePoolSize并且workQueue已滿。 ?*?這時,再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize; ?*?如果失敗則拒絕該任務 ?*/ ????else?if?(!addWorker(command,?false)) ????????reject(command); }
簡單來說,在執行execute()方法時如果狀態一直是RUNNING時,的執行過程如下:
如果workerCount < corePoolSize,則創建并啟動一個線程來執行新提交的任務;
如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建并啟動一個線程來執行新提交的任務;
如果workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
這里要注意一下addWorker(null, false);,也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務。所以,在workerCountOf(recheck) == 0時執行addWorker(null, false);也是為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務。
execute方法執行流程如下:
addWorker方法
addWorker方法的主要工作是在線程池中創建一個新的線程并執行,firstTask參數 用于指定新增的線程執行的第一個任務,core參數為true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程數是否少于maximumPoolSize,代碼如下:
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{ ????retry: ????for?(;;)?{ ????????int?c?=?ctl.get(); ????//?獲取運行狀態 ????????int?rs?=?runStateOf(c); ????/* ?????*?這個if判斷 ?????*?如果rs?>=?SHUTDOWN,則表示此時不再接收新任務; ?????*?接著判斷以下3個條件,只要有1個不滿足,則返回false: ?????*?1.?rs?==?SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務 ?????*?2.?firsTask為空 ?????*?3.?阻塞隊列不為空 ?????*? ?????*?首先考慮rs?==?SHUTDOWN的情況 ?????*?這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false; ?????*?然后,如果firstTask為空,并且workQueue也為空,則返回false, ?????*?因為隊列中已經沒有任務了,不需要再添加線程了 ?????*/ ?????//?Check?if?queue?empty?only?if?necessary. ????????if?(rs?>=?SHUTDOWN?&& ????????????????!?(rs?==?SHUTDOWN?&& ????????????????????????firstTask?==?null?&& ????????????????????????!?workQueue.isEmpty())) ????????????return?false; ????????for?(;;)?{ ????????????//?獲取線程數 ????????????int?wc?=?workerCountOf(c); ????????????//?如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false; ????????????//?這里的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較, ????????????//?如果為false則根據maximumPoolSize來比較。 ????????????//? ????????????if?(wc?>=?CAPACITY?|| ????????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize)) ????????????????return?false; ????????????//?嘗試增加workerCount,如果成功,則跳出第一個for循環 ????????????if?(compareAndIncrementWorkerCount(c)) ????????????????break?retry; ????????????//?如果增加workerCount失敗,則重新獲取ctl的值 ????????????c?=?ctl.get();??//?Re-read?ctl ????????????//?如果當前的運行狀態不等于rs,說明狀態已被改變,返回第一個for循環繼續執行 ????????????if?(runStateOf(c)?!=?rs) ????????????????continue?retry; ????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop ????????} ????} ????boolean?workerStarted?=?false; ????boolean?workerAdded?=?false; ????Worker?w?=?null; ????try?{ ?????//?根據firstTask來創建Worker對象 ????????w?=?new?Worker(firstTask); ?????//?每一個Worker對象都會創建一個線程 ????????final?Thread?t?=?w.thread; ????????if?(t?!=?null)?{ ????????????final?ReentrantLock?mainLock?=?this.mainLock; ????????????mainLock.lock(); ????????????try?{ ????????????????int?rs?=?runStateOf(ctl.get()); ????????????????//?rs?<?SHUTDOWN表示是RUNNING狀態; ????????????????//?如果rs是RUNNING狀態或者rs是SHUTDOWN狀態并且firstTask為null,向線程池中添加線程。 ????????????????//?因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務 ????????????????if?(rs?<?SHUTDOWN?|| ????????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{ ????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable ????????????????????????throw?new?IllegalThreadStateException(); ????????????????????//?workers是一個HashSet ????????????????????workers.add(w); ????????????????????int?s?=?workers.size(); ????????????????????//?largestPoolSize記錄著線程池中出現過的最大線程數量 ????????????????????if?(s?>?largestPoolSize) ????????????????????????largestPoolSize?=?s; ????????????????????workerAdded?=?true; ????????????????} ????????????}?finally?{ ????????????????mainLock.unlock(); ????????????} ????????????if?(workerAdded)?{ ????????????????//?啟動線程 ????????????????t.start(); ????????????????workerStarted?=?true; ????????????} ????????} ????}?finally?{ ????????if?(!?workerStarted) ????????????addWorkerFailed(w); ????} ????return?workerStarted; }
Worker類
線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象,請參見JDK源碼。
Worker類繼承了AQS,并實現了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存傳入的任務;thread是在調用構造方法時通過ThreadFactory來創建的線程,是用來處理任務的線程。
在調用構造方法時,需要把任務傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數是this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在啟動的時候會調用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現獨占鎖的功能。為什么不使用ReentrantLock來實現呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
lock方法一旦獲取了獨占鎖,表示當前線程正在執行任務中;
如果正在執行任務,則不應該中斷線程;
如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該線程進行中斷;
線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態;
之所以設置為不可重入,是因為我們不希望任務在調用像setCorePoolSize這樣的線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務中調用了如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。
所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。
此外,在構造方法中執行了setState(-1);,把state變量設置為-1,為什么這么做呢?是因為AQS中默認的state是0,如果剛創建了一個Worker對象,還沒有執行任務時,這時就不應該被中斷,看一下tryAquire方法:
protected?boolean?tryAcquire(int?unused)?{ ????//cas修改state,不可重入 ????if?(compareAndSetState(0,?1))?{? ????????setExclusiveOwnerThread(Thread.currentThread()); ????????return?true; ????} ????return?false; }
tryAcquire方法是根據state是否是0來判斷的,所以,setState(-1);將state設置為-1是為了禁止在執行任務前對線程進行中斷。
正因為如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置為0。
runWorker方法
在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的代碼如下:
final?void?runWorker(Worker?w)?{ ????Thread?wt?=?Thread.currentThread(); ????//?獲取第一個任務 ????Runnable?task?=?w.firstTask; ????w.firstTask?=?null; ????//?允許中斷 ????w.unlock();?//?allow?interrupts ????//?是否因為異常退出循環 ????boolean?completedAbruptly?=?true; ????try?{ ????????//?如果task為空,則通過getTask來獲取任務 ????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{ ????????????w.lock(); ????????????if?((runStateAtLeast(ctl.get(),?STOP)?|| ????????????????????(Thread.interrupted()?&& ????????????????????????????runStateAtLeast(ctl.get(),?STOP)))?&& ????????????????????!wt.isInterrupted()) ????????????????wt.interrupt(); ????????????try?{ ????????????????beforeExecute(wt,?task); ????????????????Throwable?thrown?=?null; ????????????????try?{ ????????????????????task.run(); ????????????????}?catch?(RuntimeException?x)?{ ????????????????????thrown?=?x;?throw?x; ????????????????}?catch?(Error?x)?{ ????????????????????thrown?=?x;?throw?x; ????????????????}?catch?(Throwable?x)?{ ????????????????????thrown?=?x;?throw?new?Error(x); ????????????????}?finally?{ ????????????????????afterExecute(task,?thrown); ????????????????} ????????????}?finally?{ ????????????????task?=?null; ????????????????w.completedTasks++; ????????????????w.unlock(); ????????????} ????????} ????????completedAbruptly?=?false; ????}?finally?{ ????????processWorkerExit(w,?completedAbruptly); ????} }
這里說明一下第一個if判斷,目的是:
如果線程池正在停止,那么要保證當前線程是中斷狀態;
如果不是的話,則要保證當前線程不是中斷狀態;
這里要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設置為STOP,回顧一下STOP狀態:
不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態。
STOP狀態要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,因為Thread.interrupted()方法會復位中斷的狀態。
總結一下runWorker方法的執行過程:
while循環不斷地通過getTask()方法獲取任務;
getTask()方法從阻塞隊列中取任務;
如果線程池正在停止,那么要保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態;
調用task.run()執行任務;
如果task為null則跳出循環,執行processWorkerExit()方法;
runWorker方法執行完畢,也代表著Worker中的run方法執行完畢,銷毀線程。
這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現。
completedAbruptly變量來表示在執行任務過程中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。
getTask方法
getTask方法用來從阻塞隊列中取任務,代碼如下:
private?Runnable?getTask()?{ ????//?timeOut變量的值表示上次從阻塞隊列中取任務時是否超時 ????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out? ????for?(;;)?{ ????????int?c?=?ctl.get(); ????????int?rs?=?runStateOf(c); ????????//?Check?if?queue?empty?only?if?necessary. ????/* ?????*?如果線程池狀態rs?>=?SHUTDOWN,也就是非RUNNING狀態,再進行以下判斷: ?????*?1.?rs?>=?STOP,線程池是否正在stop; ?????*?2.?阻塞隊列是否為空。 ?????*?如果以上條件滿足,則將workerCount減1并返回null。 ?????*?因為如果當前線程池狀態的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務。 ?????*/ ????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{ ????????????decrementWorkerCount(); ????????????return?null; ????????} ????????int?wc?=?workerCountOf(c); ????????//?Are?workers?subject?to?culling? ????????//?timed變量用于判斷是否需要進行超時控制。 ????????//?allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時; ????????//?wc?>?corePoolSize,表示當前線程池中的線程數量大于核心線程數量; ????????//?對于超過核心線程數量的這些線程,需要進行超時控制 ????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize; ????/* ?????*?wc?>?maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法; ?????*?timed?&&?timedOut?如果為true,表示當前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務發生了超時 ?????*?接下來判斷,如果有效線程數量大于1,或者阻塞隊列是空的,那么嘗試將workerCount減1; ?????*?如果減1失敗,則返回重試。 ?????*?如果wc?==?1時,也就說明當前線程是線程池中唯一的一個線程了。 ?????*/ ????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut)) ????????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{ ????????????if?(compareAndDecrementWorkerCount(c)) ????????????????return?null; ????????????continue; ????????} ????????try?{ ????????/* ?????????*?根據timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null; ?????????*?否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。 ?????????* ?????????*/ ????????????Runnable?r?=?timed?? ????????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?: ????????????????????workQueue.take(); ????????????if?(r?!=?null) ????????????????return?r; ????????????//?如果?r?==?null,說明已經超時,timedOut設置為true ????????????timedOut?=?true; ????????}?catch?(InterruptedException?retry)?{ ????????????//?如果獲取任務時當前線程發生了中斷,則設置timedOut為false并返回循環重試 ????????????timedOut?=?false; ????????} ????} }
這里重要的地方是第二個if判斷,目的是控制線程池的有效線程數量。由上文中的分析可以知道,在執行execute方法時,如果當前線程池的線程數量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務,也就是timedOut為true的情況,說明workQueue已經為空了,也就說明了當前線程池中不需要那么多線程來執行任務了,可以把多于corePoolSize數量的線程銷毀掉,保持線程數量在corePoolSize即可。
什么時候會銷毀?當然是runWorker方法執行完之后,也就是Worker中的run方法執行完,由JVM自動回收。
getTask方法返回null時,在runWorker方法中會跳出while循環,然后會執行processWorkerExit方法。
processWorkerExit方法
private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{ ????//?如果completedAbruptly值為true,則說明線程執行時出現了異常,需要將workerCount減1; ????//?如果線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操作,這里就不必再減了。?? ????if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted ????????decrementWorkerCount(); ????final?ReentrantLock?mainLock?=?this.mainLock; ????mainLock.lock(); ????try?{ ????????//統計完成的任務數 ????????completedTaskCount?+=?w.completedTasks; ????????//?從workers中移除,也就表示著從線程池中移除了一個工作線程 ????????workers.remove(w); ????}?finally?{ ????????mainLock.unlock(); ????} ????//?根據線程池狀態進行判斷是否結束線程池 ????tryTerminate(); ????int?c?=?ctl.get(); /* ?*?當線程池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那么會直接addWorker; ?*?如果allowCoreThreadTimeOut=true,并且等待隊列有任務,至少保留一個worker; ?*?如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。 ?*/ ????if?(runStateLessThan(c,?STOP))?{ ????????if?(!completedAbruptly)?{ ????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize; ????????????if?(min?==?0?&&?!?workQueue.isEmpty()) ????????????????min?=?1; ????????????if?(workerCountOf(c)?>=?min) ????????????????return;?//?replacement?not?needed ????????} ????????addWorker(null,?false); ????} }
至此,processWorkerExit執行完之后,工作線程被銷毀,以上就是整個工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創建新的工作線程,runWorker通過getTask獲取任務,然后執行任務,如果getTask返回null,進入processWorkerExit方法,整個線程結束,如圖所示:
總結
分析了線程的創建,任務的提交,狀態的轉換以及線程池的關閉;
這里通過execute方法來展開線程池的工作流程,execute方法通過corePoolSize,maximumPoolSize以及阻塞隊列的大小來判斷決定傳入的任務應該被立即執行,還是應該添加到阻塞隊列中,還是應該拒絕任務。
介紹了線程池關閉時的過程,也分析了shutdown方法與getTask方法存在競態條件;
在獲取任務時,要通過線程池的狀態來判斷應該結束工作線程還是阻塞線程等待新的任務,也解釋了為什么關閉線程池時要中斷工作線程以及為什么每一個worker都需要lock。
最后
歡迎大家一起交流,喜歡文章記得點個贊喲,感謝支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。