本篇內容主要講解“JUC-Future與FutureTask原理是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“JUC-Future與FutureTask原理是什么”吧!
1 Future
Future 表示一個任務的生命周期,是一個可取消的異步運算。提供了相應的方法來判斷任務狀態(完成或取消),以及獲取任務的結果和取消任務等。適合具有可取消性和執行時間較長的異步任務。
并發包中許多異步任務類都繼承自Future,其中最典型的就是 FutureTask
1.1 介紹
Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結果。計算完成后只能使用get方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以聲明 Future 形式類型、并返回 null 作為底層任務的結果。
也就是說Future具有這樣的特性
異步執行,可用 get 方法獲取執行結果
如果計算還沒完成,get 方法是會阻塞的,如果完成了,是可以多次獲取并立即得到結果的
如果計算還沒完成,是可以取消計算的
可以查詢計算的執行狀態
2 FutureTask
FutureTask 為 Future 提供了基礎實現,如獲取任務執行結果(get)和取消任務(cancel)等。如果任務尚未完成,獲取任務執行結果時將會阻塞。一旦執行結束,任務就不能被重啟或取消(除非使用runAndReset執行計算)。
FutureTask 常用來封裝 Callable 和 Runnable,也可作為一個任務提交到線程池中執行。除了作為一個獨立的類,此類也提供創建自定義 task 類使用。FutureTask 的線程安全由CAS保證。
FutureTask 內部維護了一個由volatile修飾的int型變量—state,代表當前任務的運行狀態
NEW:新建
COMPLETING:完成
NORMAL:正常運行
EXCEPTIONAL:異常退出
CANCELLED:任務取消
INTERRUPTING:線程中斷中
INTERRUPTED:線程已中斷
在這七種狀態中,有四種任務終止狀態:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各種狀態的轉化如下:
數據結構及核心參數
//內部持有的callable任務,運行完畢后置空 private Callable<V> callable; //從get()中返回的結果或拋出的異常 private Object outcome; // non-volatile, protected by state reads/writes //運行callable的線程,在 run 時進行 CAS 操作 private volatile Thread runner; //使用Treiber棧保存等待線程 private volatile WaitNode waiters;
FutureTask 繼承了Runnale和Future,本身也作為一個線程運行,可以提交給線程池執行。維護了一個內部類WaitNode,使用簡單的Treiber棧(無鎖并發棧)實現,用于存儲等待線程。FutureTask 只有一個自定義的同步器 Sync 的屬性,所有的方法都是委派給此同步器來實現。這也是JUC里使用AQS的通用模式。
源碼解析
FutureTask 的同步器 由于Future在任務完成后,可以多次自由獲取結果,因此,用于控制同步的AQS使用共享模式。
FutureTask 底層任務的執行狀態保存在AQS的狀態里。AQS是否允許線程獲取(是否阻塞)是取決于任務是否執行完成,而不是具體的狀態值。
private final class Sync extends AbstractQueuedSynchronizer { // 定義表示任務執行狀態的常量。由于使用了位運算進行判斷,所以狀態值分別是2的冪。 // 表示任務已經準備好了,可以執行 private static final int READY = 0; // 表示任務正在執行中 private static final int RUNNING = 1; // 表示任務已執行完成 private static final int RAN = 2; // 表示任務已取消 private static final int CANCELLED = 4; // 底層的表示任務的可執行對象 private final Callable<V> callable; // 表示任務執行結果,用于get方法返回。 private V result; // 表示任務執行中的異常,用于get方法調用時拋出。 private Throwable exception; /* * 用于執行任務的線程。在 set/cancel 方法后置為空,表示結果可獲取。 * 必須是 volatile的,用于確保完成后(result和exception)的可見性。 * (如果runner不是volatile,則result和exception必須都是volatile的) */ private volatile Thread runner; /** * 已完成或已取消 時成功獲取 */ protected int tryAcquireShared( int ignore) { return innerIsDone() ? 1 : -1; } /** * 在設置最終完成狀態后讓AQS總是通知,通過設置runner線程為空。 * 這個方法并沒有更新AQS的state屬性, * 所以可見性是通過對volatile的runner的寫來保證的。 */ protected boolean tryReleaseShared( int ignore) { runner = null; return true; } // 執行任務的方法 void innerRun() { // 用于確保任務不會重復執行 if (!compareAndSetState(READY, RUNNING)) return; // 由于Future一般是異步執行,所以runner一般是線程池里的線程。 runner = Thread.currentThread(); // 設置執行線程后再次檢查,在執行前檢查是否被異步取消 // 由于前面的CAS已把狀態設置RUNNING, if (getState() == RUNNING) { // recheck after setting thread V result; // try { result = callable.call(); } catch (Throwable ex) { // 捕獲任務執行過程中拋出的所有異常 setException(ex); return; } set(result); } else { // 釋放等待的線程 releaseShared(0); // cancel } } // 設置結果 void innerSet(V v) { // 放在循環里進行是為了失敗后重試。 for (;;) { // AQS初始化時,狀態值默認是 0,對應這里也就是 READY 狀態。 int s = getState(); // 已完成任務不能設置結果 if (s == RAN) return; // 已取消 的任務不能設置結果 if (s == CANCELLED) { // releaseShared 會設置runner為空, // 這是考慮到與其他的取消請求線程 競爭中斷 runner releaseShared(0); return; } // 先設置已完成,免得多次設置 if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); // 此方法會更新 runner,保證result的可見性 done(); return; } } } // 獲取異步計算的結果 V innerGet() throws InterruptedException, ExecutionException { acquireSharedInterruptibly(0);// 獲取共享,如果沒有完成則會阻塞。 // 檢查是否被取消 if (getState() == CANCELLED) throw new CancellationException(); // 異步計算過程中出現異常 if (exception != null) throw new ExecutionException(exception); return result; } // 取消執行任務 boolean innerCancel( boolean mayInterruptIfRunning) { for (;;) { int s = getState(); // 已完成或已取消的任務不能再次取消 if (ranOrCancelled(s)) return false; // 任務處于 READY 或 RUNNING if (compareAndSetState(s, CANCELLED)) break; } // 任務取消后,中斷執行線程 if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } releaseShared(0); // 釋放等待的訪問結果的線程 done(); return true; } /** * 檢查任務是否處于完成或取消狀態 */ private boolean ranOrCancelled( int state) { return (state & (RAN | CANCELLED)) != 0; } // 其他方法省略 }
從 innerCancel 方法可知,取消操作只是改變了任務對象的狀態并可能會中斷執行線程。如果任務的邏輯代碼沒有響應中斷,則會一直異步執行直到完成,只是最終的執行結果不會被通過get方法返回,計算資源的開銷仍然是存在的。
總的來說,Future 是線程間協調的一種工具。
AbstractExecutorService.submit(Callable task)
FutureTask 內部實現方法都很簡單,先從線程池的submit分析。submit方法默認實現在AbstractExecutorService,幾種實現源碼如下:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
首先調用newTaskFor方法構造FutureTask,然后調用execute把任務放進線程池中,返回FutureTask
FutureTask.run()
public void run() { //新建任務,CAS替換runner為當前線程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result);//設置執行結果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);//處理中斷邏輯 } }
運行任務,如果任務狀態為NEW狀態,則利用CAS修改為當前線程。執行完畢調用set(result)方法設置執行結果。 set(result)源碼如下
首先利用cas修改state狀態為
設置返回結果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式設置state狀態為
結果設置完畢后,調用finishCompletion()喚醒等待線程
private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待線程 for (;;) {//自旋遍歷等待線程 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//喚醒等待線程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //任務完成后調用函數,自定義擴展 done(); callable = null; // to reduce footprint }
回到run方法,如果在 run 期間被中斷,此時需要調用handlePossibleCancellationInterrupt處理中斷邏輯,確保任何中斷(例如cancel(true))只停留在當前run或runAndReset的任務中
private void handlePossibleCancellationInterrupt(int s) { //在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
FutureTask.runAndReset()
runAndReset是 FutureTask另外一個任務執行的方法,它不會返回執行結果,而且在任務執行完之后會重置stat的狀態為NEW,使任務可以多次執行。 runAndReset的典型應用是在 ScheduledThreadPoolExecutor 中,周期性的執行任務。
FutureTask.get()
FutureTask 通過get()獲取任務執行結果。如果任務處于未完成的狀態(state <= COMPLETING),就調用awaitDone等待任務完成。任務完成后,通過report獲取執行結果或拋出執行期間的異常。
awaitDone(boolean timed, long nanos)
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) {//自旋 if (Thread.interrupted()) {//獲取并清除中斷狀態 removeWaiter(q);//移除等待WaitNode throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null;//置空等待節點的線程 return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //CAS修改waiter queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q);//超時,移除等待節點 return state; } LockSupport.parkNanos(this, nanos);//阻塞當前線程 } else LockSupport.park(this);//阻塞當前線程 } }
awaitDone用于等待任務完成,或任務因為中斷或超時而終止。返回任務的完成狀態。
1.如果線程被中斷,首先清除中斷狀態,調用removeWaiter移除等待節點,然后拋InterruptedException。removeWaiter源碼如下:
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null;//首先置空線程 retry: for (;;) { // restart on removeWaiter race //依次遍歷查找 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替換 continue retry; } break; } } }
2.如果當前為結束態(state>COMPLETING),則根據需要置空等待節點的線程,并返回 Future 狀態
3.如果當前為正在完成(COMPLETING),說明此時 Future 還不能做出超時動作,為任務讓出CPU執行時間片
4.如果state為NEW,先新建一個WaitNode,然后CAS修改當前waiters
5.如果等待超時,則調用removeWaiter移除等待節點,返回任務狀態;如果設置了超時時間但是尚未超時,則park阻塞當前線程
6.其他情況直接阻塞當前線程
FutureTask.cancel(boolean mayInterruptIfRunning)
public boolean cancel(boolean mayInterruptIfRunning) {
//如果當前Future狀態為NEW,根據參數修改Future狀態為INTERRUPTING或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//可以在運行時中斷
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();//移除并喚醒所有等待線程
}
return true;
}
說明:嘗試取消任務。如果任務已經完成或已經被取消,此操作會失敗。如果當前Future狀態為NEW,根據參數修改Future狀態為INTERRUPTING或CANCELLED。如果當前狀態不為NEW,則根據參數mayInterruptIfRunning決定是否在任務運行中也可以中斷。中斷操作完成后,調用finishCompletion移除并喚醒所有等待線程。
示例
到此,相信大家對“JUC-Future與FutureTask原理是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。