溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java線程池的實現原理源碼分析

發布時間:2022-04-28 10:18:16 來源:億速云 閱讀:166 作者:iii 欄目:開發技術

Java線程池的實現原理源碼分析

目錄

  1. 引言
  2. 線程池的基本概念
  3. Java線程池的實現
  4. 線程池的核心源碼分析
  5. 線程池的拒絕策略
  6. 線程池的監控與調優
  7. 總結

引言

在多線程編程中,線程池是一種非常重要的技術。它可以幫助我們有效地管理線程資源,避免頻繁地創建和銷毀線程,從而提高系統的性能和穩定性。Java提供了java.util.concurrent包來支持線程池的實現,其中最核心的類是ThreadPoolExecutor。本文將深入分析ThreadPoolExecutor的實現原理,并通過源碼解析其工作流程。

線程池的基本概念

2.1 什么是線程池

線程池是一種多線程處理形式,它通過預先創建一定數量的線程,并將任務提交到線程池中執行,從而避免了頻繁創建和銷毀線程的開銷。線程池中的線程可以重復使用,執行完一個任務后,線程不會被銷毀,而是繼續執行下一個任務。

2.2 線程池的優勢

  • 降低資源消耗:通過重復利用已創建的線程,減少線程創建和銷毀的開銷。
  • 提高響應速度:任務到達時,無需等待線程創建即可立即執行。
  • 提高線程的可管理性:線程池可以對線程進行統一管理,如控制線程的數量、監控線程的狀態等。

2.3 線程池的核心參數

  • corePoolSize:核心線程數,線程池中保持的最小線程數。
  • maximumPoolSize:最大線程數,線程池中允許的最大線程數。
  • keepAliveTime:線程空閑時間,超過該時間的空閑線程將被回收。
  • workQueue:任務隊列,用于存放待執行的任務。
  • threadFactory:線程工廠,用于創建新線程。
  • handler:拒絕策略,當任務無法被線程池執行時的處理策略。

Java線程池的實現

3.1 ThreadPoolExecutor

ThreadPoolExecutor是Java線程池的核心實現類,它提供了線程池的基本功能。ThreadPoolExecutor繼承自AbstractExecutorService,并實現了ExecutorService接口。

3.2 線程池的狀態

ThreadPoolExecutor使用一個AtomicInteger類型的變量ctl來表示線程池的狀態和線程數量。ctl的高3位表示線程池的狀態,低29位表示線程池中的線程數量。

線程池的狀態包括:

  • RUNNING:運行狀態,可以接受新任務并處理隊列中的任務。
  • SHUTDOWN:關閉狀態,不再接受新任務,但會處理隊列中的任務。
  • STOP:停止狀態,不再接受新任務,也不處理隊列中的任務,并中斷正在執行的任務。
  • TIDYING:整理狀態,所有任務都已終止,線程池即將終止。
  • TERMINATED:終止狀態,線程池已完全終止。

3.3 線程池的工作流程

  1. 當有任務提交到線程池時,線程池首先會判斷當前線程數是否小于核心線程數。如果小于,則創建新線程執行任務。
  2. 如果當前線程數已達到核心線程數,則將任務放入任務隊列中等待執行。
  3. 如果任務隊列已滿,且當前線程數小于最大線程數,則創建新線程執行任務。
  4. 如果當前線程數已達到最大線程數,且任務隊列已滿,則根據拒絕策略處理任務。

線程池的核心源碼分析

4.1 ThreadPoolExecutor的構造方法

ThreadPoolExecutor提供了多個構造方法,最常用的構造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

該構造方法初始化了線程池的核心參數,包括核心線程數、最大線程數、空閑時間、任務隊列、線程工廠和拒絕策略。

4.2 execute方法

execute方法是線程池的核心方法,用于提交任務到線程池中執行。其源碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute方法的主要邏輯如下:

  1. 首先檢查任務是否為null,如果是則拋出NullPointerException。
  2. 獲取當前線程池的狀態和線程數量。
  3. 如果當前線程數小于核心線程數,則嘗試創建新線程執行任務。
  4. 如果當前線程數已達到核心線程數,則將任務放入任務隊列中。
  5. 如果任務隊列已滿,則嘗試創建新線程執行任務。
  6. 如果無法創建新線程,則根據拒絕策略處理任務。

4.3 addWorker方法

addWorker方法用于創建新線程并執行任務。其源碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法的主要邏輯如下:

  1. 首先檢查線程池的狀態,如果線程池已關閉且任務隊列為空,則返回false。
  2. 然后檢查當前線程數是否超過限制,如果超過則返回false。
  3. 使用CAS操作增加線程數量,如果成功則跳出循環。
  4. 創建Worker對象,并將其添加到workers集合中。
  5. 啟動Worker線程,如果啟動成功則返回true,否則調用addWorkerFailed方法進行清理。

4.4 runWorker方法

runWorker方法是Worker線程的執行邏輯。其源碼如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法的主要邏輯如下:

  1. 首先獲取當前線程和任務。
  2. 如果任務不為null,則執行任務。
  3. 如果任務為null,則調用getTask方法從任務隊列中獲取任務。
  4. 在執行任務之前,調用beforeExecute方法進行前置處理。
  5. 執行任務,并捕獲可能拋出的異常。
  6. 在執行任務之后,調用afterExecute方法進行后置處理。
  7. 如果任務執行過程中發生異常,則標記completedAbruptlytrue。
  8. 最后調用processWorkerExit方法處理線程退出。

4.5 getTask方法

getTask方法用于從任務隊列中獲取任務。其源碼如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask方法的主要邏輯如下:

  1. 首先檢查線程池的狀態,如果線程池已關閉且任務隊列為空,則減少線程數量并返回null。
  2. 檢查當前線程數是否超過最大線程數,或者線程是否允許超時回收。
  3. 如果線程允許超時回收,則調用poll方法從任務隊列中獲取任務,如果超時則返回null。
  4. 如果線程不允許超時回收,則調用take方法從任務隊列中獲取任務,如果任務隊列為空則阻塞等待。

4.6 processWorkerExit方法

processWorkerExit方法用于處理線程退出。其源碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

processWorkerExit方法的主要邏輯如下:

  1. 如果線程是異常退出,則減少線程數量。
  2. 更新已完成任務的數量,并從workers集合中移除該線程。
  3. 調用tryTerminate方法嘗試終止線程池。
  4. 如果線程池未停止,則根據當前線程數和任務隊列的情況決定是否需要創建新線程。

線程池的拒絕策略

5.1 拒絕策略的實現

ThreadPoolExecutor提供了四種內置的拒絕策略:

  • AbortPolicy:直接拋出RejectedExecutionException異常。
  • CallerRunsPolicy:由調用線程直接執行任務。
  • DiscardPolicy:直接丟棄任務,不做任何處理。
  • DiscardOldestPolicy:丟棄任務隊列中最舊的任務,然后重新嘗試執行當前任務。

5.2 自定義拒絕策略

除了使用內置的拒絕策略外,我們還可以自定義拒絕策略。自定義拒絕策略需要實現RejectedExecutionHandler接口,并重寫rejectedExecution方法。

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 自定義拒絕策略的邏輯
    }
}

線程池的監控與調優

6.1 線程池的監控

線程池的監控主要包括以下幾個方面:

  • 線程池的狀態:可以通過isShutdown、isTerminated等方法監控線程池的狀態。
  • 線程池的線程數量:可以通過getPoolSize、getActiveCount等方法監控線程池中的線程數量。
  • 任務隊列的大小:可以通過getQueue方法獲取任務隊列,并監控其大小。
  • 已完成任務的數量:可以通過getCompletedTaskCount方法獲取已完成任務的數量。

6.2 線程池的調優

線程池的調優主要包括以下幾個方面:

  • 核心線程數的設置:核心線程數的設置應根據系統的負載情況和任務的特性進行調整。如果任務較多且執行時間較短,可以適當增加核心線程數;如果任務較少且執行時間較長,可以適當減少核心線程數。
  • 最大線程數的設置:最大線程數的設置應根據系統的資源情況和任務的特性進行調整。如果系統的資源充足,可以適當增加最大線程數;如果系統的資源有限,應適當減少最大線程數。
  • 任務隊列的選擇:任務隊列的選擇應根據任務的特性進行調整。如果任務較多且執行時間較短,可以選擇SynchronousQueue;如果任務較少且執行時間較長,可以選擇LinkedBlockingQueue。
  • 拒絕策略的選擇:拒絕策略的選擇應根據任務的特性進行調整。如果任務的重要性較高,可以選擇CallerRunsPolicy;如果任務的重要性較低,可以選擇DiscardPolicy。

總結

本文詳細分析了Java線程池的實現原理,并通過源碼解析了ThreadPoolExecutor的工作流程。線程池是多線程編程中非常重要的技術,合理地使用線程池可以顯著提高系統的性能和穩定性。在實際開發中,我們應根據系統的負載情況和任務的特性,合理地設置線程池的參數,并進行監控和調優,以確保線程池的高效運行。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女