溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么解析JUC 下CountDownLatch,CyclicBarrier,Semaphore

發布時間:2021-12-20 09:24:39 來源:億速云 閱讀:169 作者:柒染 欄目:大數據

怎么解析JUC 下CountDownLatch,CyclicBarrier,Semaphore

目錄

  1. 引言
  2. CountDownLatch
  3. CyclicBarrier
  4. Semaphore
  5. 總結

引言

在Java并發編程中,JUC(java.util.concurrent)包提供了許多強大的工具類,用于處理多線程并發問題。其中,CountDownLatch、CyclicBarrierSemaphore是三個常用的同步工具類。本文將詳細解析這三個類的使用場景、源碼實現以及示例代碼,幫助讀者更好地理解和使用它們。

CountDownLatch

基本概念

CountDownLatch是一個同步輔助類,允許一個或多個線程等待其他線程完成操作。它通過一個計數器來實現,計數器的初始值由構造函數指定。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器的值減到0時,等待的線程就會被喚醒。

使用場景

CountDownLatch通常用于以下場景:

  • 主線程等待多個子線程完成任務:例如,主線程需要等待多個子線程完成初始化操作后才能繼續執行。
  • 多個子線程等待主線程發出信號:例如,多個子線程需要等待主線程發出開始信號后才能同時開始執行。

源碼解析

CountDownLatch的核心實現依賴于AQS(AbstractQueuedSynchronizer)。AQS是一個用于構建鎖和同步器的框架,CountDownLatch通過繼承AQS來實現同步功能。

public class CountDownLatch {
    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
}

示例代碼

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is running");
                latch.countDown();
            }).start();
        }

        latch.await();
        System.out.println("All threads have finished");
    }
}

CyclicBarrier

基本概念

CyclicBarrier是一個同步輔助類,允許一組線程互相等待,直到所有線程都到達某個屏障點(barrier point)。與CountDownLatch不同,CyclicBarrier可以重復使用,即當所有線程都到達屏障點后,屏障會被重置,可以再次使用。

使用場景

CyclicBarrier通常用于以下場景:

  • 多線程任務分階段執行:例如,多個線程需要分階段執行任務,每個階段都需要等待所有線程完成當前階段的任務后才能進入下一個階段。
  • 并行計算:例如,多個線程需要同時開始執行某個計算任務,并且需要等待所有線程完成計算后才能繼續執行后續操作。

源碼解析

CyclicBarrier的核心實現依賴于ReentrantLockCondition。ReentrantLock用于實現線程間的互斥,Condition用于實現線程間的等待和喚醒。

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties;
    private final Runnable barrierCommand;
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    private static class Generation {
        boolean broken = false;
    }
}

示例代碼

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("All threads have reached the barrier");
        });

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " is waiting at the barrier");
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " has passed the barrier");
            }).start();
        }
    }
}

Semaphore

基本概念

Semaphore是一個計數信號量,用于控制同時訪問某個資源的線程數量。它通過一個計數器來實現,計數器的初始值由構造函數指定。每當一個線程訪問資源時,計數器的值就會減1;當線程釋放資源時,計數器的值就會加1。如果計數器的值為0,則線程必須等待,直到有其他線程釋放資源。

使用場景

Semaphore通常用于以下場景:

  • 資源池管理:例如,數據庫連接池、線程池等,通過Semaphore來控制同時訪問資源的線程數量。
  • 限流:例如,限制某個接口的并發訪問量,防止系統過載。

源碼解析

Semaphore的核心實現依賴于AQS(AbstractQueuedSynchronizer)。AQS是一個用于構建鎖和同步器的框架,Semaphore通過繼承AQS來實現同步功能。

public class Semaphore implements java.io.Serializable {
    private final Sync sync;

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void release() {
        sync.releaseShared(1);
    }

    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    public int availablePermits() {
        return sync.getPermits();
    }

    public int drainPermits() {
        return sync.drainPermits();
    }

    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    public boolean isFair() {
        return sync instanceof FairSync;
    }

    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count under zero");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

示例代碼

public class SemaphoreExample {
    public static void main(String[] args) {
        int permits = 3;
        Semaphore semaphore = new Semaphore(permits);

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " is running");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }).start();
        }
    }
}

總結

CountDownLatch、CyclicBarrierSemaphore是JUC包中常用的同步工具類,它們分別適用于不同的并發場景。CountDownLatch用于等待多個線程完成任務,CyclicBarrier用于多個線程互相等待,Semaphore用于控制同時訪問某個資源的線程數量。通過理解它們的實現原理和使用場景,可以更好地處理多線程并發問題。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女