本篇內容介紹了“阻塞隊列之如何理解LinkedBlockingQueue源碼”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
LinkedBlockingQueue 由鏈接節點支持的可選有界隊列,是一個基于鏈表的無界隊列(理論上有界),隊列按照先進先出的順序進行排序。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默認為 Integer.MAX_VALUE,也就是無界隊列。所以為了避免隊列過大造成機器負載或者內存爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小。
隊列創建
BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
上面這段代碼中,blockingQueue 的容量將設置為 Integer.MAX_VALUE 。
應用場景
多用于任務隊列,單線程發布任務,任務滿了就停止等待阻塞,當任務被完成消費少了又開始負責發布任務。
我們來看一個例子:
package com.niuh.queue.linked; import org.apache.commons.lang.RandomStringUtils; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class TestLinkedBlockingQueue { private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); // 線程控制開關 private final CountDownLatch latch = new CountDownLatch(1); // 線程池 private final ExecutorService pool; // AtomicLong 計數 生產數量 private final AtomicLong output = new AtomicLong(0); // AtomicLong 計數 銷售數量 private final AtomicLong sales = new AtomicLong(0); // 是否停止線程 private final boolean clear; public TestLinkedBlockingQueue(boolean clear) { this.pool = Executors.newCachedThreadPool(); this.clear = clear; } public void service() throws InterruptedException { Consumer a = new Consumer(queue, sales, latch, clear); pool.submit(a); Producer w = new Producer(queue, output, latch); pool.submit(w); latch.countDown(); } public static void main(String[] args) { TestLinkedBlockingQueue t = new TestLinkedBlockingQueue(false); try { t.service(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 消費者(銷售產品) */ class Consumer implements Runnable { private final LinkedBlockingQueue<String> queue; private final AtomicLong sales; private final CountDownLatch latch; private final boolean clear; public Consumer(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear) { this.queue = queue; this.sales = sales; this.latch = latch; this.clear = clear; } public void run() { try { latch.await(); // 放閘之前老實的等待著 for (; ; ) { sale(); Thread.sleep(500); } } catch (InterruptedException e) { if (clear) { // 響應中斷請求后,如果有要求則銷售完隊列的產品后再終止線程 cleanWarehouse(); } else { System.out.println("Seller Thread will be interrupted..."); } } } public void sale() { System.out.println("==取take="); try { String item = queue.poll(50, TimeUnit.MILLISECONDS); System.out.println(item); if (item != null) { sales.incrementAndGet(); // 可以聲明long型的參數獲得返回值,作為日志的參數 } } catch (InterruptedException e) { e.printStackTrace(); } } /** * 銷售完隊列剩余的產品 */ private void cleanWarehouse() { try { while (queue.size() > 0) { sale(); } } catch (Exception ex) { System.out.println("Seller Thread will be interrupted..."); } } } /** * 生產者(生產產品) * */ class Producer implements Runnable { private LinkedBlockingQueue<String> queue; private CountDownLatch latch; private AtomicLong output; public Producer() { } public Producer(LinkedBlockingQueue<String> queue, AtomicLong output, CountDownLatch latch) { this.queue = queue; this.latch = latch; this.output = output; } public void run() { try { latch.await(); // 線程等待 for (; ; ) { work(); Thread.sleep(100); } } catch (InterruptedException e) { System.out.println("Producer thread will be interrupted..."); } } /** * 工作 */ public void work() { try { String product = RandomStringUtils.randomAscii(3); boolean success = queue.offer(product, 100, TimeUnit.MILLISECONDS); if (success) { output.incrementAndGet();// 可以聲明long型的參數獲得返回值,作為日志的參數 } } catch (InterruptedException e) { e.printStackTrace(); } } }
工作原理
LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以并行執行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在并發情況下的線程安全。
向無限隊列添加元素的所有操作都將永遠不會阻塞,[注意這里不是說不會加鎖保證線程安全],因此它可以增長到非常大的容量。
使用無限 BlockingQueue 設計生產者 - 消費者模型時最重要的是 消費者應該能夠像生產者向隊列添加消息一樣快地消費消息。否則,內存可能會填滿,然后就會得到一個 OutOfMemory 異常。
源碼分析
定義
LinkedBlockingQueue的類繼承關系如下:
其包含的方法定義如下:
成員屬性
/** * 節點類,用于存儲數據 */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 阻塞隊列的大小, 默認為Integer.MAX_VALUE */ private final int capacity; /** 當前阻塞隊列中的元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 阻塞隊列的頭節點 */ transient Node<E> head; /** * 阻塞隊列的尾節點 */ private transient Node<E> last; /** 獲取并移除元素時使用的鎖,如take,poll,etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** notEmpty 條件對象,當隊列沒有數據時用于掛起執行刪除的線程 */ private final Condition notEmpty = takeLock.newCondition(); /** 添加元素時使用的鎖,如 put,offer,etc */ private final ReentrantLock putLock = new ReentrantLock(); /** notFull 條件對象,每當隊列數據已滿時用于掛起執行添加的線程 */ private final Condition notFull = putLock.newCondition();
從上面的屬性我們知道,每個添加到LinkedBlockingQueue隊列中的數據都將被封裝成Node節點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對并發進行控制,也就是說,添加和刪除操作并不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。
這里如果不指定隊列的容量大小,也就是使用默認的Integer.MAX_VALUE,如果存在添加速度大于刪除速度時候,有可能會內存溢出,這點在使用前希望慎重考慮。
另外,LinkedBlockingQueue對每一個lock鎖都提供了一個Condition用來掛起和喚醒其他線程。
構造函數
默認的構造函數和最后一個構造函數創建的隊列大小都為 Integer.MAX_VALUE,只有第二個構造函數用戶可以指定隊列的大小。第二個構造函數最后初始化了last和head節點,讓它們都指向了一個元素為null的節點。
最后一個構造函數使用了putLock來進行加鎖,但是這里并不是為了多線程的競爭而加鎖,只是為了放入的元素能立即對其他線程可見。
public LinkedBlockingQueue() { // 默認大小為Integer.MAX_VALUE this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
入隊方法
LinkedBlockingQueue提供了多種入隊操作的實現來滿足不同情況下的需求,入隊操作有如下幾種:
void put(E e);
boolean offer(E e);
boolean offer(E e, long timeout, TimeUnit unit)。
其中:
offer方法有兩個重載版本,只有一個參數的版本,如果隊列滿了就返回false,否則加入到隊列中,返回true,add方法就是調用此版本的offer方法;另一個帶時間參數的版本,如果隊列滿了則等待,可指定等待的時間,如果這期間中斷了則拋出異常,如果等待超時了則返回false,否則加入到隊列中返回true;
put方法跟帶時間參數的offer方法邏輯一樣,不過沒有等待的時間限制,會一直等待直到隊列有空余位置了,再插入到隊列中,返回true。
put(E e)
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 獲取鎖中斷 putLock.lockInterruptibly(); try { //判斷隊列是否已滿,如果已滿阻塞等待 while (count.get() == capacity) { notFull.await(); } // 把node放入隊列中 enqueue(node); c = count.getAndIncrement(); // 再次判斷隊列是否有可用空間,如果有喚醒下一個線程進行添加操作 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果隊列中有一條數據,喚醒消費線程進行消費 if (c == 0) signalNotEmpty(); }
小結put方法來看,它總共做了以下情況的考慮:
隊列已滿,阻塞等待。
隊列未滿,創建一個node節點放入隊列中,如果放完以后隊列還有剩余空間,繼續喚醒下一個添加線程進行添加。如果放之前隊列中沒有元素,放完以后要喚醒消費線程進行消費。
我們再看看put方法中用到的幾個其他方法,先來看看 enqueue(Node node) 方法:
private void enqueue(Node<E> node) { last = last.next = node; }
用一張圖來看看往隊列里依次放入元素A和元素B,如下:
接下來我們看看signalNotEmpty,順帶著看signalNotFull方法。
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
為什么要這么寫?因為signal的時候要獲取到該signal對應的Condition對象的鎖才行。
offer(E e)
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 隊列有可用空間,放入node節點,判斷放入元素后是否還有可用空間, // 如果有,喚醒下一個添加線程進行添加操作。 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
可以看到offer僅僅對put方法改動了一點點,當隊列沒有可用元素的時候,不同于put方法的阻塞等待,offer方法直接方法false。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 等待超時時間nanos,超時時間到了返回false while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
該方法只是對offer方法進行了阻塞超時處理,使用了Condition的awaitNanos來進行超時等待,這里為什么要用while循環?因為awaitNanos方法是可中斷的,為了防止在等待過程中線程被中斷,這里使用while循環進行等待過程中中斷的處理,繼續等待剩下需等待的時間。
出隊方法
入隊列的方法說完后,我們來說說出隊列的方法。LinkedBlockingQueue提供了多種出隊操作的實現來滿足不同情況下的需求,如下:
E take();
E poll();
E poll(long timeout, TimeUnit unit);
take()
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 隊列為空,阻塞等待 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); // 隊列中還有元素,喚醒下一個消費線程進行消費 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 移除元素之前隊列是滿的,喚醒生產線程進行添加元素 if (c == capacity) signalNotFull(); return x; }
take方法看起來就是put方法的逆向操作,它總共做了以下情況的考慮:
隊列為空,阻塞等待
隊列不為空,從對首獲取并移除一個元素,如果消費后還有元素在隊列中,繼續喚醒下一個消費線程進行元素移除。如果放之前隊列是滿元素的情況,移除完后需要喚醒生產線程進行添加元素。
我們來看看dequeue方法
private E dequeue() { // 獲取到head節點 Node<E> h = head; // 獲取到head節點指向的下一個節點 Node<E> first = h.next; // head節點原來指向的節點的next指向自己,等待下次gc回收 h.next = h; // help GC // head節點指向新的節點 head = first; // 獲取到新的head節點的item值 E x = first.item; // 新head節點的item值設置為null first.item = null; return x; }
我們結合注釋和圖來看一下鏈表算法:
其實這個寫法看起來很繞,我們其實也可以這么寫:
private E dequeue() { // 獲取到head節點 Node<E> h = head; // 獲取到head節點指向的下一個節點,也就是節點A Node<E> first = h.next; // 獲取到下下個節點,也就是節點B Node<E> next = first.next; // head的next指向下下個節點,也就是圖中的B節點 h.next = next; // 得到節點A的值 E x = first.item; first.item = null; // help GC first.next = first; // help GC return x; }
poll()
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
poll方法去除了take方法中元素為空后阻塞等待這一步驟,這里也就不詳細說了。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一樣,利用了Condition的awaitNanos方法來進行阻塞等待直至超時。這里就不列出來說了。
獲取元素方法
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
加鎖后,獲取到head節點的next節點,如果為空返回null,如果不為空,返回next節點的item值。
刪除元素方法
public boolean remove(Object o) { if (o == null) return false; // 兩個lock全部上鎖 fullyLock(); try { // 從head開始遍歷元素,直到最后一個元素 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 如果找到相等的元素,調用unlink方法刪除元素 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { // 兩個lock全部解鎖 fullyUnlock(); } } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
因為remove方法使用兩個鎖全部上鎖,所以其他操作都需要等待它完成,而該方法需要從head節點遍歷到尾節點,所以時間復雜度為O(n)。我們來看看unlink方法。
void unlink(Node<E> p, Node<E> trail) { // p的元素置為null p.item = null; // p的前一個節點的next指向p的next,也就是把p從鏈表中去除了 trail.next = p.next; // 如果last指向p,刪除p后讓last指向trail if (last == p) last = trail; // 如果刪除之前元素是滿的,刪除之后就有空間了,喚醒生產線程放入元素 if (count.getAndDecrement() == capacity) notFull.signal(); }
總結
LinkedBlockingQueue是一個阻塞隊列,內部由兩個ReentrantLock來實現出入隊列的線程安全,由各自的Condition對象的await和signal來實現等待和喚醒功能。它和ArrayBlockingQueue的不同點在于:
隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對于后者而言,當添加速度大于移除速度時,在無界的情況下,可能會造成內存溢出等問題。
數據存儲容器不同,ArrayBlockingQueue采用的是數組作為數據存儲容器,而LinkedBlockingQueue采用的則是以Node節點作為連接對象的鏈表。
由于ArrayBlockingQueue采用的是數組的存儲容器,因此在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內需要高效并發地處理大批量數據的時,對于GC可能存在較大影響。
兩者的實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
“阻塞隊列之如何理解LinkedBlockingQueue源碼”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。