溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用AQS共享鎖,Semaphore、CountDownLatch

發布時間:2021-10-23 17:28:13 來源:億速云 閱讀:205 作者:iii 欄目:編程語言
# 如何使用AQS共享鎖:Semaphore、CountDownLatch深度解析

## 一、AQS核心機制回顧

### 1.1 AQS架構設計
AbstractQueuedSynchronizer(AQS)作為Java并發包的基石,采用模板方法模式實現同步器框架。其核心組成包括:
- volatile int state:同步狀態標志
- CLH變體隊列:采用雙向鏈表實現的等待隊列
- ConditionObject:條件變量實現類

```java
// AQS簡化結構
public abstract class AbstractQueuedSynchronizer {
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    
    protected final boolean compareAndSetState(int expect, int update) {
        // CAS操作實現
    }
    
    // 需要子類實現的模板方法
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
}

1.2 共享模式與獨占模式區別

特性 獨占模式 共享模式
資源獲取 排他性占用 多個線程可同時獲取
典型實現 ReentrantLock Semaphore/CountDownLatch
喚醒策略 只喚醒一個后繼節點 傳播式喚醒后續所有共享節點
適用場景 互斥訪問場景 資源控制/多線程協同

二、Semaphore原理與實戰

2.1 信號量核心機制

Semaphore通過AQS的state表示可用許可數,其工作流程:

  1. acquire():state>0時CAS減1,不足時入隊
  2. release():CAS增加state并喚醒后繼節點
// Semaphore同步器實現
static final class NonfairSync extends Sync {
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || 
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

2.2 高級應用場景

場景1:數據庫連接池

public class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> pool;

    public ConnectionPool(int size) {
        semaphore = new Semaphore(size);
        pool = new ArrayBlockingQueue<>(size);
        // 初始化連接...
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return pool.take();
    }

    public void release(Connection conn) {
        pool.offer(conn);
        semaphore.release();
    }
}

場景2:限流控制器

class RateLimiter {
    private final Semaphore semaphore;
    private final ScheduledExecutorService scheduler;

    public RateLimiter(int permitsPerSecond) {
        semaphore = new Semaphore(permitsPerSecond);
        scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> 
            semaphore.release(permitsPerSecond - semaphore.availablePermits()),
            0, 1, TimeUnit.SECONDS);
    }

    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
}

2.3 性能優化建議

  1. 公平性選擇:非公平模式吞吐量更高(減少線程切換)
  2. 批量操作:使用acquire(int permits)減少CAS次數
  3. 超時控制tryAcquire(long timeout, TimeUnit unit)避免死鎖

三、CountDownLatch深度解析

3.1 實現原理揭秘

CountDownLatch.Sync重寫AQS方法:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0) return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

3.2 典型應用模式

模式1:多線程初始化協調

public class ServiceInitializer {
    private final CountDownLatch latch = new CountDownLatch(3);

    public void init() throws InterruptedException {
        new Thread(() -> {
            initDatabase();
            latch.countDown();
        }).start();
        
        new Thread(() -> {
            initCache();
            latch.countDown();
        }).start();
        
        new Thread(() -> {
            loadConfig();
            latch.countDown();
        }).start();
        
        latch.await();
        System.out.println("All services initialized");
    }
}

模式2:并行計算聚合

public class ParallelCalculator {
    public long calculate(int[] data) throws InterruptedException {
        int threadCount = Runtime.getRuntime().availableProcessors();
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicLong result = new AtomicLong();
        
        int segmentSize = data.length / threadCount;
        for (int i = 0; i < threadCount; i++) {
            int start = i * segmentSize;
            int end = (i == threadCount-1) ? data.length : start + segmentSize;
            
            new Thread(() -> {
                long sum = 0;
                for (int j = start; j < end; j++) {
                    sum += data[j];
                }
                result.addAndGet(sum);
                latch.countDown();
            }).start();
        }
        
        latch.await();
        return result.get();
    }
}

3.3 與CyclicBarrier對比

特性 CountDownLatch CyclicBarrier
重置能力 不可重置 可循環使用
觸發條件 計數減到0 線程數達到屏障值
等待行為 主線程等待工作線程 所有線程互相等待
典型場景 啟動協調/結束檢測 并行計算分階段同步

四、AQS共享鎖底層實現

4.1 共享鎖獲取流程

@startuml
title 共享鎖獲取流程

participant ThreadA
participant AQS
participant CLH隊列

ThreadA -> AQS: acquireShared()
alt 快速路徑成功
    AQS --> ThreadA: 直接返回
else 需要排隊
    ThreadA -> AQS: addWaiter(Node.SHARED)
    AQS -> CLH隊列: 插入尾節點
    loop 自旋檢查
        ThreadA -> AQS: shouldParkAfterFailedAcquire()
        AQS -> ThreadA: 是否需要park
        ThreadA -> AQS: parkAndCheckInterrupt()
    end
    AQS --> ThreadA: 被前驅節點喚醒
end
@enduml

4.2 共享鎖釋放傳播

關鍵代碼段:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h); // 喚醒后繼節點
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head) // 檢查頭節點是否變化
            break;
    }
}

五、生產環境實踐指南

5.1 參數調優經驗

  1. Semaphore許可數計算

    • CPU密集型:許可數=CPU核心數+1
    • IO密集型:許可數=CPU核心數 * (1 + 平均等待時間/平均計算時間)
  2. CountDownLatch防死鎖

// 建議增加超時控制
if (!latch.await(30, TimeUnit.SECONDS)) {
    log.warn("Initialization timeout");
    // 觸發補償機制
}

5.2 常見問題排查

問題現象:線程在await()處永久阻塞

排查步驟: 1. 檢查countDown()調用次數是否匹配 2. 使用jstack查看線程棧:

jstack <pid> | grep -A 10 'java.util.concurrent.CountDownLatch'
  1. 檢查是否有異常導致countDown()未執行

六、擴展應用場景

6.1 分布式環境適配

// 基于Redis的分布式CountDownLatch
public class RedisCountDownLatch {
    private final Jedis jedis;
    private final String latchKey;
    
    public void countDown() {
        jedis.decr(latchKey);
    }
    
    public void await() throws InterruptedException {
        while (true) {
            String val = jedis.get(latchKey);
            if ("0".equals(val)) return;
            Thread.sleep(100);
        }
    }
}

6.2 與CompletableFuture結合

public CompletableFuture<Void> parallelTasks(List<Runnable> tasks) {
    CountDownLatch latch = new CountDownLatch(tasks.size());
    return CompletableFuture.runAsync(() -> {
        tasks.forEach(task -> 
            CompletableFuture.runAsync(() -> {
                try {
                    task.run();
                } finally {
                    latch.countDown();
                }
            }));
        latch.await();
    });
}

附錄:關鍵API速查表

Semaphore核心方法

方法簽名 說明
void acquire() 阻塞獲取許可
boolean tryAcquire() 非阻塞嘗試獲取
void release() 釋放許可
int availablePermits() 查詢當前可用許可數

CountDownLatch核心方法

方法簽名 說明
void await() 阻塞直到計數歸零
boolean await(long timeout, TimeUnit unit) 帶超時的等待
void countDown() 計數減一
long getCount() 獲取當前計數值

本文共包含代碼示例15個,流程圖2幅,對比表格3個,完整演示了AQS共享鎖的實現原理和實戰用法。實際應用中請根據具體場景調整參數和異常處理邏輯。 “`

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女