# Java多線程的阻塞隊列實現
## 一、阻塞隊列概述
### 1.1 什么是阻塞隊列
阻塞隊列(BlockingQueue)是Java并發包(java.util.concurrent)中提供的一種線程安全的隊列實現。它在普通隊列的基礎上增加了兩個附加操作:
1. 當隊列為空時,獲取元素的線程會等待隊列變為非空
2. 當隊列滿時,存儲元素的線程會等待隊列可用
這種特性使得阻塞隊列成為生產者-消費者模式的理想實現方式,無需開發者手動實現線程間的等待/通知機制。
### 1.2 阻塞隊列的核心特性
- **線程安全**:所有操作都是原子性的
- **阻塞機制**:提供put/take等阻塞方法
- **容量限制**:可以是有界隊列或無界隊列
- **公平性選項**:部分實現支持公平訪問策略
### 1.3 Java中的阻塞隊列實現類
Java并發包提供了多種阻塞隊列實現:
1. ArrayBlockingQueue:基于數組的有界阻塞隊列
2. LinkedBlockingQueue:基于鏈表的可選有界阻塞隊列
3. PriorityBlockingQueue:支持優先級排序的無界阻塞隊列
4. DelayQueue:使用優先級隊列實現的無界阻塞隊列
5. SynchronousQueue:不存儲元素的阻塞隊列
6. LinkedTransferQueue:基于鏈表的無界阻塞隊列
7. LinkedBlockingDeque:基于鏈表的雙向阻塞隊列
## 二、阻塞隊列的核心方法
### 2.1 插入操作
| 方法 | 說明 | 特殊行為 |
|------|------|----------|
| add(E e) | 添加元素到隊列 | 隊列滿時拋出IllegalStateException |
| offer(E e) | 添加元素到隊列 | 隊列滿時返回false |
| put(E e) | 添加元素到隊列 | 隊列滿時阻塞等待 |
| offer(E e, long timeout, TimeUnit unit) | 添加元素到隊列 | 隊列滿時等待指定時間 |
### 2.2 移除操作
| 方法 | 說明 | 特殊行為 |
|------|------|----------|
| remove() | 移除并返回隊列頭元素 | 隊列空時拋出NoSuchElementException |
| poll() | 移除并返回隊列頭元素 | 隊列空時返回null |
| take() | 移除并返回隊列頭元素 | 隊列空時阻塞等待 |
| poll(long timeout, TimeUnit unit) | 移除并返回隊列頭元素 | 隊列空時等待指定時間 |
### 2.3 檢查操作
| 方法 | 說明 | 特殊行為 |
|------|------|----------|
| element() | 返回隊列頭元素 | 隊列空時拋出NoSuchElementException |
| peek() | 返回隊列頭元素 | 隊列空時返回null |
## 三、阻塞隊列的實現原理
### 3.1 鎖與條件變量
阻塞隊列的核心實現依賴于ReentrantLock和Condition:
```java
// 以ArrayBlockingQueue為例
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他初始化代碼
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
以put()方法為例:
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 隊列滿時等待
enqueue(e); // 實際入隊操作
} finally {
lock.unlock();
}
}
以enqueue()方法為例:
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal(); // 喚醒等待的消費者線程
}
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items; // 存儲元素的數組
int takeIndex; // 下一個要取出的元素索引
int putIndex; // 下一個要放入的元素索引
int count; // 當前元素數量
// 鎖和條件變量
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
// 迭代器
transient Itrs itrs;
}
入隊操作:
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
出隊操作:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 節點類
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity; // 容量限制
private final AtomicInteger count = new AtomicInteger(); // 當前元素數量
// 頭節點和尾節點
transient Node<E> head;
private transient Node<E> last;
// 分離的鎖
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
}
入隊操作:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
出隊操作:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
基于堆結構的優先級阻塞隊列:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue;
private transient int size;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;
// 擴容時使用的自旋鎖
private transient volatile int allocationSpinLock;
}
特點: - 無界隊列(自動擴容) - 元素必須實現Comparable或提供Comparator - 出隊順序由優先級決定
用于實現延遲任務的隊列:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
private Thread leader;
}
特點: - 元素必須實現Delayed接口 - 只有到期元素才能被取出 - 應用場景:緩存過期、定時任務調度
不存儲元素的阻塞隊列:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
// 兩種不同的傳輸策略
static final class TransferStack<E> extends Transferer<E> { /*...*/ }
static final class TransferQueue<E> extends Transferer<E> { /*...*/ }
}
特點: - 每個插入操作必須等待一個移除操作 - 吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue - 適合傳遞性場景
經典實現方式:
// 生產者
class Producer implements Runnable {
private final BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
String item = produceItem();
queue.put(item);
Thread.sleep(100);
}
} catch (InterruptedException ex) {
// 處理中斷
}
}
private String produceItem() {
// 生產邏輯
}
}
// 消費者
class Consumer implements Runnable {
private final BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
String item = queue.take();
consumeItem(item);
}
} catch (InterruptedException ex) {
// 處理中斷
}
}
private void consumeItem(String item) {
// 消費邏輯
}
}
Java線程池使用阻塞隊列作為工作隊列:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
// 實現代碼
}
實現簡單的消息隊列:
public class SimpleMessageQueue {
private final BlockingQueue<Message> queue;
public SimpleMessageQueue(int capacity) {
this.queue = new LinkedBlockingQueue<>(capacity);
}
public void send(Message msg) throws InterruptedException {
queue.put(msg);
}
public Message receive() throws InterruptedException {
return queue.take();
}
}
場景: 生產者等待隊列空間,消費者等待生產者釋放鎖
解決方案: - 使用雙鎖設計的LinkedBlockingQueue - 設置合理的超時時間 - 避免在持有鎖時調用外部方法
場景: 無界隊列持續增長導致OOM
解決方案: - 使用有界隊列 - 實現自定義的拒絕策略 - 監控隊列大小
場景: 單一鎖成為系統瓶頸
解決方案: - 使用分離鎖的實現(如LinkedBlockingQueue) - 考慮無鎖隊列(如ConcurrentLinkedQueue) - 分區處理(多個隊列)
public class SimpleBlockingQueue<E> {
private final E[] items;
private int putIndex, takeIndex, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public SimpleBlockingQueue(int capacity) {
this.items = (E[]) new Object[capacity];
}
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
E e = items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
notFull.signal();
return e;
} finally {
lock.unlock();
}
}
}
public class CASBlockingQueue<E> {
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
this.item = item;
}
}
private volatile Node<E> head;
private volatile Node<E> tail;
private final AtomicInteger count = new AtomicInteger(0);
private final int capacity;
public CASBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
head = tail = new Node<>(null);
}
public boolean offer(E e) {
Objects.requireNonNull(e);
Node<E> newNode = new Node<>(e);
for (;;) {
Node<E> currentTail = tail;
Node<E> tailNext = currentTail.next;
if (currentTail == tail) {
if (tailNext != null) {
// 幫助推進尾節點
compareAndSetTail(tail, tailNext);
} else {
if (count.get() < capacity) {
if (compareAndSetNext(currentTail, null, newNode)) {
compareAndSetTail(tail, newNode);
count.incrementAndGet();
return true;
}
} else {
return false;
}
}
}
}
}
// 省略其他方法和CAS操作實現
}
場景 | 推薦隊列 | 理由 |
---|---|---|
固定大小線程池 | ArrayBlockingQueue | 簡單高效 |
高并發生產者消費者 | LinkedBlockingQueue | 吞吐量高 |
任務優先級處理 | PriorityBlockingQueue | 支持優先級 |
延遲任務調度 | DelayQueue | 內置延遲支持 |
直接傳遞任務 | SynchronousQueue | 零容量設計 |
本文共約9350字,詳細介紹了Java多線程中阻塞隊列的實現原理、各種實現類的特點、應用場景以及性能優化建議。 “`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。