這篇文章主要介紹“JUC的CyclicBarrier怎么實現”,在日常操作中,相信很多人在JUC的CyclicBarrier怎么實現問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”JUC的CyclicBarrier怎么實現”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
前面介紹的 CountDownLatch 同步器是基于 AQS 實現的,而本文要介紹的 CyclicBarrier 則沒有直接繼承 AQS 的 AbstractQueuedSynchronizer 抽象類,而是基于 ReentrantLock 鎖進行實現。首先來看一下 CyclicBarrier 的字段定義,如下:
public class CyclicBarrier { /** 支撐 CyclicBarrier 的重入鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 條件隊列,已經到達屏障的線程會在條件隊列中等待其它線程 */ private final Condition trip = lock.newCondition(); /** 參與的線程數 */ private final int parties; /** 當所有線程都到達屏障時的回調函數 */ private final Runnable barrierCommand; /** 當前年代對象 */ private Generation generation = new Generation(); /** 當前剩余未完成的線程數 */ private int count; // ... 省略方法定義 }
上述各個字段的含義如代碼注釋,這里我們進一步解釋一下 generation 字段,該字段為 Generation 類型,用于表示當前 CyclicBarrier 同步器的年代信息。Generation 內部類定義如下:
private static class Generation { boolean broken = false; }
當新建一個 CyclicBarrier 對象時會初始化 CyclicBarrier#generation
字段。此外,當所有參與的線程都到達屏障后(也稱 tripped),或者 CyclicBarrier 被重置(即調用 CyclicBarrier#reset
方法)時,會新建一個 Generation 對象賦值給 CyclicBarrier#generation
字段,表示年代的更替。
Generation 定義的 Generation#broken
屬性用于標識當前屏障是否被打破。當 CyclicBarrier 被重置,或者參與到該屏障的某個線程被中斷、等待超時,亦或是執行回調函數發生異常,都會導致屏障被打破。破損的屏障(即 broken=true
)會導致當前參與等待的線程,以及已經處于等待狀態的線程拋出 BrokenBarrierException 異常,并退出當前屏障進程。
因為 CyclicBarrier 的復用性,導致在程序運行期間可能并存多個年代信息,但是任何時刻只有一個年代對象是活躍的,剩余的年代對象對應的 CyclicBarrier 要么是已經用完的(tripped),要么就是已經破損的。
介紹完了字段定義,下面來分析一下 CyclicBarrier 的方法實現,首先來看一下構造方法。CyclicBarrier 定義了兩個構造方法,實現如下:
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) { throw new IllegalArgumentException(); } this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中 parties 參數用于指定當前參與的線程數,參數 barrierAction 用于指定當所有參與的線程都到達屏障時的回調邏輯。你可能有些疑問,既然設置了 parties 字段,為什么還要設置一個 count 字段呢?
考慮 CyclicBarrier 是可重用的,所以需要有一個字段記錄參與線程的數目,即 parties 字段,而 count 字段初始值等于 parties 字段值,但是在運行期間其值是會隨著參與線程逐一到達屏障而遞減的,所以 count 值始終記錄的是當前未到達屏障的線程數。當 CyclicBarrier 被重置時,我們需要依據 parties 字段值來重置 count 字段值。
繼續來看一下 CyclicBarrier 除構造方法以外的剩余方法實現,主要分析一下 CyclicBarrier#await
方法和 CyclicBarrier#reset
方法。首先來看一下 CyclicBarrier#reset
方法,當我們希望復用 CyclicBarrier 對象時可以調用該方法,用于重置 count 值、年代信息,并喚醒所有位于條件隊列中等待的線程。方法實現如下:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { this.breakBarrier(); // break the current generation this.nextGeneration(); // start a new generation } finally { lock.unlock(); } } private void breakBarrier() { // 標識當前屏障被打破 generation.broken = true; // 重置 count 字段值 count = parties; // 喚醒所有等待的線程 trip.signalAll(); } private void nextGeneration() { // 喚醒所有等待的線程 trip.signalAll(); // 重置 count 值 count = parties; generation = new Generation(); }
再來看一下 CyclicBarrier#await
方法,該方法用于阻塞當前線程,以在屏障處等待其它線程到達,CyclicBarrier 還為該方法定義了超時等待版本。當一個線程因調用 CyclicBarrier#await
方法進入等待狀態時,該線程將會在滿足以下條件之一時退出等待狀態:
所有參與的線程都已經到達了屏障。
當前線程被中斷,或者其它處于等待狀態的線程被中斷。
如果啟用了超時機制,并且某個參與的線程等待超時。
CyclicBarrier 被重置。
方法 CyclicBarrier#await
的普通版本和超時版本在實現上都是直接委托給 CyclicBarrier#dowait
方法執行,所以下面主要來分析一下該方法,實現如下:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 獲取當前年代信息 final Generation g = generation; // 當前屏障被打破,拋出異常 if (g.broken) { throw new BrokenBarrierException(); } // 當前線程被中斷,打破屏障,并喚醒所有等待的線程 if (Thread.interrupted()) { this.breakBarrier(); throw new InterruptedException(); } int index = --count; // 如果 count 值為 0,說明所有的線程都已經到達屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 如果設置了回調,則執行 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; // 喚醒所有等待的線程,并重置屏障 this.nextGeneration(); return 0; } finally { // 如果執行回調異常 if (!ranAction) { this.breakBarrier(); } } } // count 值不為 0,說明存在還未到達屏障的線程,則進入條件隊列等待 // loop until tripped, broken, interrupted, or timed out for (; ; ) { try { if (!timed) { // 進入條件隊列等待 trip.await(); } else if (nanos > 0L) { // 進入條件隊列超時等待 nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { // 當前線程被中斷,響應中斷 if (g == generation && !g.broken) { this.breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not been interrupted, // so this interrupt is deemed to "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 屏障被打破 if (g.broken) { throw new BrokenBarrierException(); } // 當前 CyclicBarrier 已經被重置 if (g != generation) { return index; } // 等待超時 if (timed && nanos <= 0L) { this.breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
由上述實現我們可以總結線程在調用 CyclicBarrier#await
方法時的整體執行流程。如果當前線程不是最后一個到達屏障的線程(遞減 count 值之后仍然大于 0),則調用 Condition#await
方法(或超時版本)將當前線程添加到條件隊列中等待。如果當前線程是最后一個到達屏障的線程(遞減 count 值之后為 0),則在線程到達屏障后執行:
如果指定了回調邏輯,則執行該回調,如果期間發生任何異常,則打破屏障、重置 count 值,并喚醒條件隊列中所有等待的線程;
否則,繼續調用 CyclicBarrier#nextGeneration
方法喚醒條件隊列中所有等待的線程,并重置 count 值和年代信息。
在上述過程中如果當前線程或處于等待狀態的線程被中斷、屏障被打破、年代信息發生變化,或者等待超時(如果允許的話),則線程將會從 Condition#await
方法中退出,即當前屏障失效。
到此,關于“JUC的CyclicBarrier怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。