這篇文章主要介紹了怎么利用Java手寫阻塞隊列的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么利用Java手寫阻塞隊列文章都會有所收獲,下面我們一起來看看吧。
阻塞隊列的主要的需求如下:
隊列基礎的功能需要有,往隊列當中放數據,從隊列當中取數據。
所有的隊列操作都要是并發安全的。
當隊列滿了之后再往隊列當中放數據的時候,線程需要被掛起,當隊列當中的數據被取出,讓隊列當中有空間的時候線程需要被喚醒。
當隊列空了之后再往隊列當中取數據的時候,線程需要被掛起,當有線程往隊列當中加入數據的時候被掛起的線程需要被喚醒。
在我們實現的隊列當中我們使用數組去存儲數據,因此在構造函數當中需要提供數組的初始大小,設置用多大的數組。
在上面我們已經談到了阻塞隊列是并發安全的,而且我們還有將線程喚醒和阻塞的需求,因此我們可以選擇可重入鎖ReentrantLock
保證并發安全,但是我們還需要將線程喚醒和阻塞,因此我們可以選擇條件變量Condition
進行線程的喚醒和阻塞操作,在Condition
當中我們將會使用到的,主要有以下兩個函數:
signal
用于喚醒線程,當一個線程調用Condition
的signal
函數的時候就可以喚醒一個被await
函數阻塞的線程。
await
用于阻塞線程,當一個線程調用Condition
的await
函數的時候這個線程就會阻塞。
因為隊列是一端進一端出,因此隊列肯定有頭有尾。
當我們往隊列當中加入一些數據之后,隊列的情況可能如下:
在上圖的基礎之上我們在進行四次出隊操作,結果如下:
在上面的狀態下,我們繼續加入8個數據,那么布局情況如下:
我們知道上圖在加入數據的時候不僅將數組后半部分的空間使用完了,而且可以繼續使用前半部分沒有使用過的空間,也就是說在隊列內部實現了一個循環使用的過程。
為了保證數組的循環使用,我們需要用一個變量記錄隊列頭在數組當中的位置,用一個變量記錄隊列尾部在數組當中的位置,還需要有一個變量記錄隊列當中有多少個數據。
根據上面的分析我們可以知道,在我們自己實現的類當中我們需要有如下的類成員變量:
// 用于保護臨界區的鎖 private final ReentrantLock lock; // 用于喚醒取數據的時候被阻塞的線程 private final Condition notEmpty; // 用于喚醒放數據的時候被阻塞的線程 private final Condition notFull; // 用于記錄從數組當中取數據的位置 也就是隊列頭部的位置 private int takeIndex; // 用于記錄從數組當中放數據的位置 也就是隊列尾部的位置 private int putIndex; // 記錄隊列當中有多少個數據 private int count; // 用于存放具體數據的數組 private Object[] items;
我們的構造函數也很簡單,最核心的就是傳入一個數組大小的參數,并且給上面的變量進行初始化賦值。
@SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其實可以不用初始化 類會有默認初始化 默認初始化為0 takeIndex = 0; putIndex = 0; count = 0; // 數組的長度肯定不能夠小于0 if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; }
這是一個比較重要的函數了,在這個函數當中如果隊列沒有滿,則直接將數據放入到數組當中即可,如果數組滿了,則需要將線程掛起。
public void put(E x){ // put 函數可能多個線程調用 但是我們需要保證在給變量賦值的時候只能夠有一個線程 // 因為如果多個線程同時進行賦值的話 那么可能后一個線程的賦值操作覆蓋了前一個線程的賦值操作 // 因此這里需要上鎖 lock.lock(); try { // 如果隊列當中的數據個數等于數組的長度的話 說明數組已經滿了 // 這個時候需要將線程掛起 while (count == items.length) notFull.await(); // 將調用 await的線程掛起 // 當數組沒有滿 或者在掛起之后再次喚醒的話說明數組當中有空間了 // 這個時候需要將數組入隊 // 調用入隊函數將數據入隊 enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 解鎖 lock.unlock(); } } // 將數據入隊 private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 喚醒一個被 take 函數阻塞的線程喚醒 }
offer函數和put函數一樣,但是與put函數不同的是,當數組當中數據填滿之后offer函數返回false
,而不是被阻塞。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果數組滿了 則直接返回false 而不是被阻塞 if (count == items.length) return false; else { // 如果數組沒有滿則直接入隊 并且返回 true enqueue(e); return true; } } finally { lock.unlock(); } }
這個函數和上面兩個函數作用一樣,也是往隊列當中加入數據,但當單隊列滿了之后這個函數會拋出異常。
public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); }
這個函數主要是從隊列當中取出一個數據,但是當隊列為空的時候,這個函數會阻塞調用該函數的線程:
public E take() throws InterruptedException { // 這個函數也是不能夠并發的 否則可能不同的線程取出的是同一個位置的數據 // 進行加鎖操作 lock.lock(); try { // 當 count 等于0 說明隊列為空 // 需要將線程掛起等待 while (count == 0) notEmpty.await(); // 當被喚醒之后進行出隊操作 return dequeue(); }finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; // 將對應的位置設置為 null GC就可以回收了 if (++takeIndex == items.length) takeIndex = 0; count--; // 隊列當中數據少一個了 // 因為出隊了一個數據 可以喚醒一個被 put 函數阻塞的線程 如果這個時候沒有被阻塞的線程 // 這個函數就不會起作用 也就說在這個函數調用之后被 put 函數掛起的線程也不會被喚醒 notFull.signal(); // 喚醒一個被 put 函數阻塞的線程 return x; }
因為我們在后面的測試函數當中會打印我們這個類,而打印這個類的時候會調用對象的toString
方法得到一個字符串,最后打印這個字符串。
@Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); // 這里需要上鎖 因為我們在打印的時候需要打印所有的數據 // 打印所有的數據就需要對數組進行遍歷操作 而在進行遍歷 // 操作的時候是不能進行插入和刪除操作的 因為打印的是某 // 個時刻的數據 lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; // 對數據進行遍歷 一共遍歷 count 次 因為數組當中一共有 count // 個數據 while (cur != count) { // 從 takeIndex 位置開始進行遍歷 因為數據是從這個位置開始的 stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", "); cur += 1; } // 刪除掉最后一次沒用的 ", " stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); }
整個我們自己完成的阻塞隊列的代碼如下:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MyArrayBlockingQueue<E> { // 用于保護臨界區的鎖 private final ReentrantLock lock; // 用于喚醒取數據的時候被阻塞的線程 private final Condition notEmpty; // 用于喚醒放數據的時候被阻塞的線程 private final Condition notFull; // 用于記錄從數組當中取數據的位置 也就是隊列頭部的位置 private int takeIndex; // 用于記錄從數組當中放數據的位置 也就是隊列尾部的位置 private int putIndex; // 記錄隊列當中有多少個數據 private int count; // 用于存放具體數據的數組 private Object[] items; @SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其實可以不用初始化 類會有默認初始化 默認初始化為0 takeIndex = 0; putIndex = 0; count = 0; if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; } public void put(E x){ lock.lock(); try { while (count == items.length) notFull.await(); enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; } public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); return dequeue(); }finally { lock.unlock(); } } @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; while (cur != count) { stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", "); cur += 1; } stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); } }
現在對上面的代碼進行測試:
我們現在使用阻塞隊列模擬一個生產者消費者模型,設置阻塞隊列的大小為5,生產者線程會往隊列當中加入數據,數據為0-9的10個數字,消費者線程一共會消費10次。
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException { MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5); Thread thread = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " 往隊列當中加入數據:" + i); queue.put(i); } }, "生產者"); Thread thread1 = new Thread(() -> { for (int i = 0; i < 10; i++) { try { System.out.println(Thread.currentThread().getName() + " 從隊列當中取出數據:" + queue.take()); System.out.println(Thread.currentThread().getName() + " 當前隊列當中的數據:" + queue); } catch (InterruptedException e) { e.printStackTrace(); } } }, "消費者"); thread.start(); TimeUnit.SECONDS.sleep(3); thread1.start(); } }
上面代碼的輸出如下所示:
生產者 往隊列當中加入數據:0
生產者 往隊列當中加入數據:1
生產者 往隊列當中加入數據:2
生產者 往隊列當中加入數據:3
生產者 往隊列當中加入數據:4
生產者 往隊列當中加入數據:5
消費者 從隊列當中取出數據:0
生產者 往隊列當中加入數據:6
消費者 當前隊列當中的數據:[1, 2, 3, 4, 5]
消費者 從隊列當中取出數據:1
消費者 當前隊列當中的數據:[2, 3, 4, 5]
消費者 從隊列當中取出數據:2
消費者 當前隊列當中的數據:[3, 4, 5, 6]
生產者 往隊列當中加入數據:7
消費者 從隊列當中取出數據:3
消費者 當前隊列當中的數據:[4, 5, 6, 7]
消費者 從隊列當中取出數據:4
消費者 當前隊列當中的數據:[5, 6, 7]
消費者 從隊列當中取出數據:5
消費者 當前隊列當中的數據:[6, 7]
生產者 往隊列當中加入數據:8
消費者 從隊列當中取出數據:6
消費者 當前隊列當中的數據:[7, 8]
消費者 從隊列當中取出數據:7
消費者 當前隊列當中的數據:[8]
消費者 從隊列當中取出數據:8
消費者 當前隊列當中的數據:[]
生產者 往隊列當中加入數據:9
消費者 從隊列當中取出數據:9
消費者 當前隊列當中的數據:[]
從上面的輸出結果我們知道,生產者線程打印5之后被掛起了,因為如果沒有被掛起,生產者線程肯定可以一次性輸出完成,因為消費者線程阻塞了3秒。但是他沒有輸出完成說明在打印5之后,因為阻塞隊列滿了,因而生產者線程被掛起了。然后消費者開始消費,這樣阻塞隊列當中就有空間了,生產者線程就可以繼續生產了。
關于“怎么利用Java手寫阻塞隊列”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么利用Java手寫阻塞隊列”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。