溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java異步編程中如何進行FutureTask源碼分析

發布時間:2021-10-12 10:35:33 來源:億速云 閱讀:140 作者:柒染 欄目:云計算

Java異步編程中如何進行FutureTask源碼分析

引言

在Java并發編程中,FutureTask是一個非常重要的類,它實現了Future接口和Runnable接口,可以用來表示一個異步計算的結果。FutureTask可以用于包裝CallableRunnable對象,并且可以通過ExecutorService提交給線程池執行。本文將深入分析FutureTask的源碼,探討其內部實現機制,以及如何在Java異步編程中使用它。

1. FutureTask概述

1.1 Future接口

Future接口表示一個異步計算的結果。它提供了檢查計算是否完成的方法,以及獲取計算結果的方法。如果計算尚未完成,get方法將會阻塞,直到計算完成。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

1.2 RunnableFuture接口

RunnableFuture接口繼承了RunnableFuture接口,表示一個可以運行的Future。FutureTask實現了RunnableFuture接口。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

1.3 FutureTask類

FutureTask類實現了RunnableFuture接口,可以用來包裝CallableRunnable對象,并且可以通過ExecutorService提交給線程池執行。

public class FutureTask<V> implements RunnableFuture<V> {
    // 內部狀態
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    // 內部任務
    private Callable<V> callable;
    private Object outcome; // 結果或異常
    private volatile Thread runner;
    private volatile WaitNode waiters;

    // 構造方法
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; // 初始狀態為NEW
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW; // 初始狀態為NEW
    }

    // 其他方法...
}

2. FutureTask的狀態機

FutureTask內部使用一個狀態機來管理任務的執行狀態。狀態機的狀態包括:

  • NEW: 初始狀態,表示任務尚未開始執行。
  • COMPLETING: 任務已經執行完成,但結果尚未設置。
  • NORMAL: 任務正常完成,結果已經設置。
  • EXCEPTIONAL: 任務執行過程中拋出異常。
  • CANCELLED: 任務被取消。
  • INTERRUPTING: 任務正在被中斷。
  • INTERRUPTED: 任務已經被中斷。

狀態轉換圖如下:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

3. FutureTask的核心方法

3.1 run方法

run方法是Runnable接口的實現,用于執行任務。run方法的主要邏輯如下:

  1. 檢查任務狀態,如果狀態不是NEW,則直接返回。
  2. 設置當前線程為執行線程。
  3. 調用Callablecall方法執行任務。
  4. 根據任務執行結果設置狀態和結果。
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

3.2 set方法

set方法用于設置任務的結果,并將狀態從COMPLETING轉換為NORMAL。

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

3.3 setException方法

setException方法用于設置任務的異常結果,并將狀態從COMPLETING轉換為EXCEPTIONAL。

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

3.4 finishCompletion方法

finishCompletion方法用于喚醒所有等待任務完成的線程。

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

3.5 get方法

get方法用于獲取任務的結果。如果任務尚未完成,get方法將會阻塞,直到任務完成。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

3.6 awaitDone方法

awaitDone方法用于等待任務完成。如果任務尚未完成,當前線程將會被阻塞。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

3.7 report方法

report方法用于根據任務的狀態返回結果或拋出異常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

4. FutureTask的使用示例

4.1 使用FutureTask執行Callable任務

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<Integer> task = () -> {
            Thread.sleep(1000);
            return 42;
        };

        FutureTask<Integer> futureTask = new FutureTask<>(task);
        Thread thread = new Thread(futureTask);
        thread.start();

        System.out.println("Waiting for result...");
        int result = futureTask.get();
        System.out.println("Result: " + result);
    }
}

4.2 使用FutureTask執行Runnable任務

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable task = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        FutureTask<Void> futureTask = new FutureTask<>(task, null);
        Thread thread = new Thread(futureTask);
        thread.start();

        System.out.println("Waiting for task to complete...");
        futureTask.get();
        System.out.println("Task completed.");
    }
}

5. 總結

FutureTask是Java并發編程中一個非常重要的類,它實現了Future接口和Runnable接口,可以用來表示一個異步計算的結果。通過深入分析FutureTask的源碼,我們可以更好地理解其內部實現機制,以及如何在Java異步編程中使用它。FutureTask的狀態機、核心方法(如run、set、get等)以及使用示例都為我們提供了豐富的知識,幫助我們更好地掌握Java并發編程的技巧。

在實際開發中,FutureTask可以用于包裝CallableRunnable對象,并且可以通過ExecutorService提交給線程池執行。通過合理地使用FutureTask,我們可以實現高效的異步編程,提升程序的并發性能。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女