本篇內容介紹了“JUC的PriorityBlockingQueue如何使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
PriorityBlockingQueue 底層依賴于數組作為存儲結構,最大容量上限是 Integer.MAX_VALUE - 8,所以幾乎可以將其視為無界的。同 PriorityQueue 一樣,PriorityBlockingQueue 同樣引入了堆數據結構來編排隊列元素的優先級,默認使用最小堆結構。
此外,由 Blocking 字樣我們可以推斷出 PriorityBlockingQueue 是一個阻塞隊列。PriorityBlockingQueue 實現自 BlockingQueue 接口,并基于 ReentrantLock 鎖保證線程安全。不過需要注意的一點是,PriorityBlockingQueue 的阻塞僅針對出隊列操作而言,當隊列為空時出隊列的線程會阻塞等待其它線程往隊列中添加新的元素。對于入隊列操作來說,因為 PriorityBlockingQueue 定義為無界,所以執行入隊列的線程會立即得到響應,如果隊列底層數組已滿則該線程會嘗試對底層數組進行擴容,當底層數據達到容量上限而無法繼續擴容時會拋出 OOM 異常。
下面先來了解一下 PriorityBlockingQueue 的字段定義,如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
/** 隊列默認初始容量 */
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 隊列容量上限
*
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in OutOfMemoryError: Requested array size exceeds VM limit
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 存儲隊列元素的數組,按照最小堆組織
*
* Priority queue represented as a balanced binary heap:
* the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].
* The priority queue is ordered by comparator, or by the elements' natural ordering,
* if comparator is null: For each node n in the heap and each descendant d of n, n <= d.
* The element with the lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
/** 隊列中元素個數 */
private transient int size;
/** 隊列元素比較器,如果為 null 則使用元素自帶的比較器 */
private transient Comparator<? super E> comparator;
/** 保證隊列操作線程安全的可重入獨占鎖 */
private final ReentrantLock lock;
/** 記錄因為隊列為空而阻塞的線程 */
private final Condition notEmpty;
/**
* 擴容標記位,保證同一時間只有一個線程在擴容隊列,狀態為 0 或 1:
* - 0: 表示當前沒有在執行擴容操作
* - 1: 表示當前正在執行擴容操作
*/
private transient volatile int allocationSpinLock;
/** 輔助支持序列化和反序列化 */
private PriorityQueue<E> q;
// ... 省略方法實現
}PriorityBlockingQueue 默認初始時的底層數組大小設置為 11,并在元素已滿時觸發擴容操作,字段 PriorityBlockingQueue#allocationSpinLock 用于控制同一時間只有一個線程在執行擴容。當某個線程檢測到當前底層數組已滿時會基于 CAS 操作嘗試將該字段值由 0 改為 1,然后開始執行擴容,并在完成之后重置該標記字段。
字段 PriorityBlockingQueue#comparator 用于指定元素比較器以判定隊列元素的優先級,如果該字段為 null,則 PriorityBlockingQueue 會基于元素自帶的比較器排列優先級。對于基本類型而言則參考元素的自然順序,對于自定義對象來說,需要保證這些對象實現了 java.lang.Comparable 接口,否則會拋出 ClassCastException 異常。
PriorityBlockingQueue 實現自 BlockingQueue 接口,下面針對核心方法的實現逐一進行分析。
針對添加元素的操作,PriorityBlockingQueue 實現了 PriorityBlockingQueue#offer、PriorityBlockingQueue#add 和 PriorityBlockingQueue#put 方法,不過后兩者都是直接調用了 PriorityBlockingQueue#offer 方法。
此外,該方法的超時版本 PriorityBlockingQueue#offer(E, long, TimeUnit) 也是直接委托給 PriorityBlockingQueue#offer 方法執行,并沒有真正實現超時等待機制,這主要是因為 PriorityBlockingQueue 是無界的,所有的添加操作都能夠被立即響應,而不會阻塞。
下面展開分析一下 PriorityBlockingQueue#offer 方法的實現,如下:
public boolean offer(E e) {
// 待添加元素不能為 null
if (e == null) {
throw new NullPointerException();
}
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
int n, cap;
Object[] array;
// 如果隊列中的元素個數大于等于隊列的容量,則執行擴容操作
while ((n = size) >= (cap = (array = queue).length)) {
this.tryGrow(array, cap); // 擴容
}
try {
// 將待添加元素插入到堆的合適位置(最小堆)
Comparator<? super E> cmp = comparator;
if (cmp == null) {
siftUpComparable(n, e, array);
} else {
// 自定義比較器
siftUpUsingComparator(n, e, array, cmp);
}
// 結點計數加 1
size = n + 1;
// 喚醒一個之前因為隊列為空而阻塞的線程
notEmpty.signal();
} finally {
// 釋放鎖
lock.unlock();
}
return true;
}PriorityBlockingQueue 同樣不允許往其中添加 null 元素,如果待添加的元素值合法則執行:
加鎖,保證同一時間只有一個線程在操作隊列;
判斷隊列是否已滿,如果是則執行擴容操作;
將元素基于最小堆數據結構的約束插入到底層數據的合適位置;
隊列結點計數加 1;
因為當前隊列至少包含一個元素,所以嘗試喚醒一個之前因為隊列為空而阻塞的線程;
釋放鎖并返回。
繼續來看一下上述步驟中的擴容過程,實現位于 PriorityBlockingQueue#tryGrow 方法中,如下:
private void tryGrow(Object[] array, int oldCap) {
// 擴容之前,先釋放鎖,避免擴容期間阻塞其它線程的出隊列、入隊列操作
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
// 基于 CAS 操作將擴容標記位由 0 改為 1
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
// 如果當前隊列長度小于 64,則擴容為 2(n + 1),否則擴容為 (1 + 1/2)n
int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // grow faster if small
// 避免隊列容量超過允許上限
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {
throw new OutOfMemoryError();
}
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array) {
newArray = new Object[newCap];
}
} finally {
// 重置擴容標記
allocationSpinLock = 0;
}
}
// 當前線程擴容失敗,則讓渡其它線程獲取鎖
if (newArray == null) {
Thread.yield();
}
// 加鎖
lock.lock();
// 替換底層存儲為擴容后的數組,并復制元素
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}在開始執行擴容之前,當前線程會釋放持有的鎖,以避免在擴容期間阻塞其它線程的出隊列操作,然后基于 CAS 操作修改擴容標記位 PriorityBlockingQueue#allocationSpinLock,保證同一時間只有一個線程在執行擴容。一開始數組較?。ㄩL度小于 64)時,線程將對底層數組成倍擴容(即 2(n + 1)),然后再按照 50% 的比例進行擴容(即 (1 + 1/2) * n),如果底層數組已經到達容量上限,則會拋出 OOM 異常。
線程在完成擴容操作之后會重置擴容標記,如果有線程在競爭 CAS 時失敗則會嘗試讓渡其它線程獲取鎖。這里主要是讓渡給成功完成擴容操作的線程,因為此時擴容操作還未真正完成,該線程需要嘗試獲取鎖以繼續用擴容后的數組替換當前底層數組。
繼續回到 PriorityBlockingQueue#offer 方法,如果擴容操作完成或者本次入隊列操作無需觸發擴容,則接下去線程會將待添加的元素按照最小堆的約束插入到底層數據的合適位置。此時需要區分兩種情況,如果在構造 PriorityBlockingQueue 對象時指定了比較器 Comparator,則會調用 PriorityBlockingQueue#siftUpUsingComparator 方法基于該比較器執行最小堆插入操作,否則調用 PriorityBlockingQueue#siftUpComparable 方法按照元素的自然順序將當前元素插入到最小堆中。
基于數組實現的堆結構,在操作上是比較簡單的,讀者可以自行參考源碼,本文不對最小堆 siftUp* 和 siftDown* 操作展開分析。
前面幾篇介紹的隊列都滿足 FIFO 的特性,在執行出隊列時返回的都是在隊列中存活時間最長的元素。對于 PriorityBlockingQueue 而言,結點的順序則按照優先級進行編排,所以這里獲取元素的操作返回的是隊列中優先級最高的結點。
針對獲取元素的操作,PriorityBlockingQueue 實現了 PriorityBlockingQueue#poll、PriorityBlockingQueue#peek 和 PriorityBlockingQueue#take 方法。其中 PriorityBlockingQueue#peek 方法僅獲取最小堆堆頂結點元素值,而不移除該結點,實現上比較簡單。方法 PriorityBlockingQueue#take 相對于 PriorityBlockingQueue#poll 的區別在于,當隊列為空時該方法會無限期阻塞,直到有其它線程往隊列中插入新的元素,或者該線程被中斷。實現層面,二者大同小異,所以下面以 PriorityBlockingQueue#poll 方法為例展開分析從 PriorityBlockingQueue 中獲取元素操作的具體實現。
PriorityBlockingQueue 針對 PriorityBlockingQueue#poll 方法定義了兩個版本,區別在于當隊列為空時是立即返回還是阻塞等待一段時間,而在實現思路上是一致的。這里以不帶超時參數的版本為例展開分析,實現如下:
public E poll() {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 出隊列,獲取最小堆堆頂元素值,并移除堆頂結點,調整最小堆
return this.dequeue();
} finally {
// 釋放鎖
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0) {
// 當前隊列為空,直接返回 null
return null;
} else {
Object[] array = queue;
// 獲取堆頂元素值
E result = (E) array[0];
// 調整堆的結構,以便再次滿足最小堆定義
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
siftDownComparable(0, x, array, n);
} else {
// 自定義比較器
siftDownUsingComparator(0, x, array, n, cmp);
}
// 隊列結點計數減 1
size = n;
return result;
}
}對于優先級隊列而言,出隊列操作獲取到的是隊列中優先級最高的元素,因為底層依賴于最小堆實現,所以只需要移除最小堆堆頂結點,并返回結點元素即可。但是因為這樣破壞了堆的結構,所以需要調用 shiftDown* 方法從上往下進行調整,以再次滿足最小堆結構的約束。
針對移除元素的操作,PriorityBlockingQueue 實現了 PriorityBlockingQueue#remove 方法,并提供了有參和無參的版本,其中無參版本實際上是委托給 PriorityBlockingQueue#poll 方法執行的。下面來分析一下有參版本的實現,如下:
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 獲取待刪除元素的數組下標
int i = this.indexOf(o);
if (i == -1) {
// 不存在
return false;
}
// 移除元素
this.removeAt(i);
return true;
} finally {
// 釋放鎖
lock.unlock();
}
}
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
// 當前移除的是最后一個元素
if (n == i) { // removed last element
array[i] = null;
}
// 當前移除的是中間元素
else {
// 將數組最后一個位置置為 null,并調整堆的結構以滿足最小堆定義
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 自上而下調整堆結構以滿足最小堆定義
if (cmp == null) {
siftDownComparable(i, moved, array, n);
} else {
siftDownUsingComparator(i, moved, array, n, cmp);
}
// 自下而上調整堆結構以滿足最小堆定義
if (array[i] == moved) {
if (cmp == null) {
siftUpComparable(i, moved, array);
} else {
siftUpUsingComparator(i, moved, array, cmp);
}
}
}
// 隊列結點計數減 1
size = n;
}如果待刪除的元素是優先級最低的元素,則只需要將底層數組末尾結點置為 null 即可,否則,對于其它優先級的元素來說,在執行刪除之后需要調整堆結構以滿足最小堆定義。
方法 PriorityBlockingQueue#contains 接收一個參數,用于判斷隊列中是否包含值等于參數的結點。
方法 PriorityBlockingQueue#size 用于返回當前隊列中包含的結點個數,因為 PriorityBlockingQueue 已經定義了 PriorityBlockingQueue#size 字段,用于對隊列中的結點進行計數,所以該方法只需要返回字段值即可。
“JUC的PriorityBlockingQueue如何使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。