# 如何使用AQS共享鎖:Semaphore、CountDownLatch深度解析
## 一、AQS核心機制回顧
### 1.1 AQS架構設計
AbstractQueuedSynchronizer(AQS)作為Java并發包的基石,采用模板方法模式實現同步器框架。其核心組成包括:
- volatile int state:同步狀態標志
- CLH變體隊列:采用雙向鏈表實現的等待隊列
- ConditionObject:條件變量實現類
```java
// AQS簡化結構
public abstract class AbstractQueuedSynchronizer {
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
protected final boolean compareAndSetState(int expect, int update) {
// CAS操作實現
}
// 需要子類實現的模板方法
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
}
| 特性 | 獨占模式 | 共享模式 |
|---|---|---|
| 資源獲取 | 排他性占用 | 多個線程可同時獲取 |
| 典型實現 | ReentrantLock | Semaphore/CountDownLatch |
| 喚醒策略 | 只喚醒一個后繼節點 | 傳播式喚醒后續所有共享節點 |
| 適用場景 | 互斥訪問場景 | 資源控制/多線程協同 |
Semaphore通過AQS的state表示可用許可數,其工作流程:
// Semaphore同步器實現
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int size) {
semaphore = new Semaphore(size);
pool = new ArrayBlockingQueue<>(size);
// 初始化連接...
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return pool.take();
}
public void release(Connection conn) {
pool.offer(conn);
semaphore.release();
}
}
class RateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
semaphore = new Semaphore(permitsPerSecond);
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() ->
semaphore.release(permitsPerSecond - semaphore.availablePermits()),
0, 1, TimeUnit.SECONDS);
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
}
acquire(int permits)減少CAS次數tryAcquire(long timeout, TimeUnit unit)避免死鎖CountDownLatch.Sync重寫AQS方法:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public class ServiceInitializer {
private final CountDownLatch latch = new CountDownLatch(3);
public void init() throws InterruptedException {
new Thread(() -> {
initDatabase();
latch.countDown();
}).start();
new Thread(() -> {
initCache();
latch.countDown();
}).start();
new Thread(() -> {
loadConfig();
latch.countDown();
}).start();
latch.await();
System.out.println("All services initialized");
}
}
public class ParallelCalculator {
public long calculate(int[] data) throws InterruptedException {
int threadCount = Runtime.getRuntime().availableProcessors();
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicLong result = new AtomicLong();
int segmentSize = data.length / threadCount;
for (int i = 0; i < threadCount; i++) {
int start = i * segmentSize;
int end = (i == threadCount-1) ? data.length : start + segmentSize;
new Thread(() -> {
long sum = 0;
for (int j = start; j < end; j++) {
sum += data[j];
}
result.addAndGet(sum);
latch.countDown();
}).start();
}
latch.await();
return result.get();
}
}
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 重置能力 | 不可重置 | 可循環使用 |
| 觸發條件 | 計數減到0 | 線程數達到屏障值 |
| 等待行為 | 主線程等待工作線程 | 所有線程互相等待 |
| 典型場景 | 啟動協調/結束檢測 | 并行計算分階段同步 |
@startuml
title 共享鎖獲取流程
participant ThreadA
participant AQS
participant CLH隊列
ThreadA -> AQS: acquireShared()
alt 快速路徑成功
AQS --> ThreadA: 直接返回
else 需要排隊
ThreadA -> AQS: addWaiter(Node.SHARED)
AQS -> CLH隊列: 插入尾節點
loop 自旋檢查
ThreadA -> AQS: shouldParkAfterFailedAcquire()
AQS -> ThreadA: 是否需要park
ThreadA -> AQS: parkAndCheckInterrupt()
end
AQS --> ThreadA: 被前驅節點喚醒
end
@enduml
關鍵代碼段:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 喚醒后繼節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 檢查頭節點是否變化
break;
}
}
Semaphore許可數計算:
CountDownLatch防死鎖:
// 建議增加超時控制
if (!latch.await(30, TimeUnit.SECONDS)) {
log.warn("Initialization timeout");
// 觸發補償機制
}
問題現象:線程在await()處永久阻塞
排查步驟:
1. 檢查countDown()調用次數是否匹配
2. 使用jstack查看線程棧:
jstack <pid> | grep -A 10 'java.util.concurrent.CountDownLatch'
countDown()未執行// 基于Redis的分布式CountDownLatch
public class RedisCountDownLatch {
private final Jedis jedis;
private final String latchKey;
public void countDown() {
jedis.decr(latchKey);
}
public void await() throws InterruptedException {
while (true) {
String val = jedis.get(latchKey);
if ("0".equals(val)) return;
Thread.sleep(100);
}
}
}
public CompletableFuture<Void> parallelTasks(List<Runnable> tasks) {
CountDownLatch latch = new CountDownLatch(tasks.size());
return CompletableFuture.runAsync(() -> {
tasks.forEach(task ->
CompletableFuture.runAsync(() -> {
try {
task.run();
} finally {
latch.countDown();
}
}));
latch.await();
});
}
| 方法簽名 | 說明 |
|---|---|
void acquire() |
阻塞獲取許可 |
boolean tryAcquire() |
非阻塞嘗試獲取 |
void release() |
釋放許可 |
int availablePermits() |
查詢當前可用許可數 |
| 方法簽名 | 說明 |
|---|---|
void await() |
阻塞直到計數歸零 |
boolean await(long timeout, TimeUnit unit) |
帶超時的等待 |
void countDown() |
計數減一 |
long getCount() |
獲取當前計數值 |
本文共包含代碼示例15個,流程圖2幅,對比表格3個,完整演示了AQS共享鎖的實現原理和實戰用法。實際應用中請根據具體場景調整參數和異常處理邏輯。 “`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。