本篇內容介紹了“并發編程之如何理解Future&FutureTask”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
前言
Java線程實現方式主要有四種:
繼承Thread類
實現Runnable接口
實現Callable接口通過FutureTask包裝器來創建Thread線程
使用ExecutorService、Callable、Future實現有返回結果的多線程。
其中前兩種方式線程執行完后都沒有返回值,后兩種是帶返回值的。
Callable 和 Runnable 接口
Runnable接口
// 實現Runnable接口的類將被Thread執行,表示一個基本的任務 public interface Runnable { // run方法就是它所有的內容,就是實際執行的任務 public abstract void run(); }
沒有返回值
run 方法沒有返回值,雖然有一些別的方法也能實現返回值得效果,比如編寫日志文件或者修改共享變量等等,但是不僅容易出錯,效率也不高。
不能拋出異常
public class RunThrowExceptionDemo { /** * 普通方法可以在方法簽名中拋出異常 * * @throws IOException */ public void normalMethod() throws IOException { throw new IOException(); } class RunnableImpl implements Runnable { /** * run 方法內無法拋出 checked Exception,除非使用 try catch 進行處理 */ @Override public void run() { try { throw new IOException(); } catch (IOException e) { e.printStackTrace(); } } } }
可以看到普通方法 normalMethod 可以在方法簽名上拋出異常,這樣上層接口就可以捕獲這個異常進行處理,但是實現 Runnable 接口的類,run 方法無法拋出 checked Exception,只能在方法內使用 try catch 進行處理,這樣上層就無法得知線程中的異常。
設計導致
其實這兩個缺陷主要原因就在于 Runnable 接口設計的 run 方法,這個方法已經規定了 run() 方法的返回類型是 void,而且這個方法沒有聲明拋出任何異常。所以,當實現并重寫這個方法時,我們既不能改返回值類型,也不能更改對于異常拋出的描述,因為在實現方法的時候,語法規定是不允許對這些內容進行修改的。
Runnable 為什么設計成這樣?
假設 run() 方法可以返回返回值,或者可以拋出異常,也無濟于事,因為我們并沒有辦法在外層捕獲并處理,這是因為調用 run() 方法的類(比如 Thread 類和線程池)是 Java 直接提供的,而不是我們編寫的。 所以就算它能有一個返回值,我們也很難把這個返回值利用到,而 Callable 接口就是為了解決這兩個問題。
Callable接口
public interface Callable<V> { //返回接口,或者拋出異常 V call() throws Exception; }
可以看到 Callable 和 Runnable 接口其實比較相似,都只有一個方法,也就是線程任務執行的方法,區別就是 call 方法有返回值,而且聲明了 throws Exception。
Callable 和 Runnable 的不同之處
方法名 :Callable 規定的執行方法是 call(),而 Runnable 規定的執行方法是 run();
返回值 :Callable 的任務執行后有返回值,而 Runnable 的任務執行后是沒有返回值的;
拋出異常 :call() 方法可拋出異常,而 run() 方法是不能拋出受檢查異常的;
與 Callable 配合的有一個 Future 接口,通過 Future 可以了解任務執行情況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 強大。
Future接口
Future的作用
簡單來說就是利用線程達到異步的效果,同時還可以獲取子線程的返回值。
比如當做一定運算的時候,運算過程可能比較耗時,有時會去查數據庫,或是繁重的計算,比如壓縮、加密等,在這種情況下,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運行效率會大大降低。
我們可以把運算的過程放到子線程去執行,再通過 Future 去控制子線程執行的計算過程,最后獲取到計算結果。這樣一來就可以把整個程序的運行效率提高,是一種異步的思想。
Future的方法
Future 接口一共有5個方法,源代碼如下:
public interface Future<V> { /** * 嘗試取消任務,如果任務已經完成、已取消或其他原因無法取消,則失敗。 * 1、如果任務還沒開始執行,則該任務不應該運行 * 2、如果任務已經開始執行,由參數mayInterruptIfRunning來決定執行該任務的線程是否應該被中斷,這只是終止任務的一種嘗試。若mayInterruptIfRunning為true,則會立即中斷執行任務的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務執行線程。 * 3、調用這個方法后,以后對isDone方法調用都返回true。 * 4、如果這個方法返回true,以后對isCancelled返回true。 */ boolean cancel(boolean mayInterruptIfRunning); /** * 判斷任務是否被取消了,如果調用了cance()則返回true */ boolean isCancelled(); /** * 如果任務完成,則返回ture * 任務完成包含正常終止、異常、取消任務。在這些情況下都返回true */ boolean isDone(); /** * 線程阻塞,直到任務完成,返回結果 * 如果任務被取消,則引發CancellationException * 如果當前線程被中斷,則引發InterruptedException * 當任務在執行的過程中出現異常,則拋出ExecutionException */ V get() throws InterruptedException, ExecutionException; /** * 線程阻塞一定時間等待任務完成,并返回任務執行結果,如果則超時則拋出TimeoutException */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
get方法(獲取結果)
get 方法最主要的作用就是獲取任務執行的結果,該方法在執行時的行為取決于 Callable 任務的狀態,可能會發生以下 7 種情況。
任務已經執行完,執行 get 方法可以立刻返回,獲取到任務執行的結果。
任務還沒有開始執行,比如我們往線程池中放一個任務,線程池中可能積壓了很多任務,還沒輪到我去執行的時候,就去 get 了,在這種情況下,相當于任務還沒開始,我們去調用 get 的時候,會當前的線程阻塞,直到任務完成再把結果返回回來。
任務正在執行中,但是執行過程比較長,所以我去 get 的時候,它依然在執行的過程中。這種情況調用 get 方法也會阻塞當前線程,直到任務執行完返回結果。
任務執行過程中拋出異常,我們再去調用 get 的時候,就會拋出 ExecutionException 異常,不管我們執行 call 方法時里面拋出的異常類型是什么,在執行 get 方法時所獲得的異常都是 ExecutionException。
任務被取消了,如果任務被取消,我們用 get 方法去獲取結果時則會拋出 CancellationException。
任務被中斷了,如果任務被當前線程中斷,我們用 get 方法去獲取結果時則會拋出InterruptedException。
任務超時,我們知道 get 方法有一個重載方法,那就是帶延遲參數的,調用了這個帶延遲參數的 get 方法之后,如果 call 方法在規定時間內正常順利完成了任務,那么 get 會正常返回;但是如果到達了指定時間依然沒有完成任務,get 方法則會拋出 TimeoutException,代表超時了。
參考示例:
package com.niuh.future; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new FutureTask()); try { Integer res = future.get(2000, TimeUnit.MILLISECONDS); System.out.println("Future線程返回值:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } static class FutureTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(new Random().nextInt(3000)); return new Random().nextInt(10); } } }
isDone方法(判斷是否執行完畢)
isDone() 方法,該方法是用來判斷當前這個任務是否執行完畢了
package com.niuh.future; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureIsDoneDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new FutureTask()); try { for (int i = 0; i < 3; i++) { Thread.sleep(1000); System.out.println("線程是否完成:" + future.isDone()); } Integer res = future.get(2000, TimeUnit.MILLISECONDS); System.out.println("Future 線程返回值:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } static class FutureTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(2000); return new Random().nextInt(10); } } }
執行結果:
線程是否完成:false 線程是否完成:false 線程是否完成:true Future 線程返回值:9
可以看到前兩次 isDone 方法的返回結果是 false,因為線程任務還沒有執行完成,第三次 isDone 方法的返回結果是 ture。
注意:這個方法返回 true 則代表執行完成了,返回 false 則代表還沒完成。但返回 true,并不代表這個任務是成功執行的,比如說任務執行到一半拋出了異常。那么在這種情況下,對于這個 isDone 方法而言,它其實也是會返回 true 的,因為對它來說,雖然有異常發生了,但是這個任務在未來也不會再被執行,它確實已經執行完畢了。所以 isDone 方法在返回 true 的時候,不代表這個任務是成功執行的,只代表它執行完畢了。
我們將上面的示例稍作修改再來看下結果,修改 FutureTask 代碼如下:
static class FutureTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(2000); throw new Exception("故意拋出異常"); } }
執行結果:
雖然拋出了異常,但是 isDone 方法的返回結果依然是 ture。
這段代碼說明了:
即便任務拋出異常,isDone 方法依然會返回 true。
雖然拋出的異常是 IllegalArgumentException,但是對于 get 而言,它拋出的異常依然是 ExecutionException。
雖然在任務執行到2秒的時候就拋出了異常,但是真正要等到我們執行 get 的時候,才看到了異常。
cancel方法(取消任務的執行)
如果不想執行某個任務了,則可以使用 cancel 方法,會有以下三種情況:
第一種情況最簡單,那就是當任務還沒有開始執行時,一旦調用 cancel,這個任務就會被正常取消,未來也不會被執行,那么 cancel 方法返回 true。
第二種情況也比較簡單。如果任務已經完成,或者之前已經被取消過了,那么執行 cancel 方法則代表取消失敗,返回 false。因為任務無論是已完成還是已經被取消過了,都不能再被取消了。
第三種情況比較特殊,就是這個任務正在執行,這個時候執行 cancel 方法是不會直接取消這個任務的,而是會根據我們傳入的參數做判斷。cancel 方法是必須傳入一個參數,該參數叫作 mayInterruptIfRunning,它是什么含義呢?如果傳入的參數是 true,執行任務的線程就會收到一個中斷的信號,正在執行的任務可能會有一些處理中斷的邏輯,進而停止,這個比較好理解。如果傳入的是 false 則就代表不中斷正在運行的任務,也就是說,本次 cancel 不會有任何效果,同時 cancel 方法會返回 false。
參考示例:
package com.niuh.future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureCancelDemo { static ExecutorService executorService = Executors.newSingleThreadExecutor(); public static void main(String[] args) { // 當任務還沒有開始執行 // demo1(); // 如果任務已經執行完 // demo2(); // 如果任務正在進行中 demo3(); } private static void demo1() { for (int i = 0; i < 1000; i++) { executorService.submit(new FutureTask()); } Future<String> future = executorService.submit(new FutureTask()); try { boolean cancel = future.cancel(false); System.out.println("Future 任務是否被取消:" + cancel); String res = future.get(2000, TimeUnit.MILLISECONDS); System.out.println("Future 線程返回值:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } private static void demo2() { Future<String> future = executorService.submit(new FutureTask()); try { Thread.sleep(1000); boolean cancel = future.cancel(false); System.out.println("Future 任務是否被取消:" + cancel); } catch (InterruptedException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } private static void demo3() { Future<String> future = executorService.submit(new FutureInterruptTask()); try { Thread.sleep(1000); boolean cancel = future.cancel(true); System.out.println("Future 任務是否被取消:" + cancel); } catch (InterruptedException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } static class FutureTask implements Callable<String> { @Override public String call() throws Exception { return "正常返回"; } } static class FutureInterruptTask implements Callable<String> { @Override public String call() throws Exception { while (!Thread.currentThread().isInterrupted()) { System.out.println("循環執行"); Thread.sleep(500); } System.out.println("線程被中斷"); return "正常返回"; } } }
這里,我們來分析下第三種情況(任務正在進行中),當我們設置 true 時,線程停止
循環執行 循環執行 Future 任務是否被取消:true
當我們設置 false 時,任務雖然也被取消成功,但是線程依然執行。
循環執行 循環執行 Future 任務是否被取消:true 循環執行 循環執行 循環執行 循環執行 ......
那么如何選擇傳入 true 還是 false 呢?
傳入 true 適用的情況是,明確知道這個任務能夠處理中斷。
傳入 false 適用于什么情況呢?如果我們明確知道這個線程不能處理中斷,那應該傳入 false。我們不知道這個任務是否支持取消(是否能響應中斷),因為在大多數情況下代碼是多人協作的,對于這個任務是否支持中斷,我們不一定有十足的把握,那么在這種情況下也應該傳入 false。如果這個任務一旦開始運行,我們就希望它完全的執行完畢。在這種情況下,也應該傳入 false。
需要注意的是,雖然示例中寫了 !Thread.currentThread().isInterrupted() 方法來判斷中斷,但是實際上并不是通過我們的代碼來進行中斷,而是 Future#cancel(true) 內部調用 t.interrupt 方法修改線程的狀態之后,Thread.sleep 會拋出 InterruptedException 異常,線程池中會執行異常的相關邏輯,并退出當前任務。 sleep 和 interrupt 會產生意想不到的效果。
比如我們將 FutureInterruptTask 代碼修改為 while(true) 形式,調用 cancel(true) 方法線程還是會被中斷。
static class FutureInterruptTask implements Callable<String> { @Override public String call() throws Exception { while (true) { System.out.println("循環執行"); Thread.sleep(500); } } }
isCancelled方法(判斷是否被取消)
isCancelled 方法,判斷是否被取消,它和 cancel 方法配合使用,比較簡單,可以參考上面的示例。
Callable 和 Future 的關系
Callable 接口相比于 Runnable 的一大優勢是可以有返回結果,返回結果就可以用 Future 類的 get 方法來獲取 。因此,Future 相當于一個存儲器,它存儲了 Callable 的 call 方法的任務結果。
除此之外,我們還可以通過 Future 的 isDone 方法來判斷任務是否已經執行完畢了,還可以通過 cancel 方法取消這個任務,或限時獲取任務的結果等,總之 Future 的功能比較豐富。
FutureTask
Future只是一個接口,不能直接用來創建對象,其實現類是FutureTask,JDK1.8修改了FutureTask的實現,JKD1.8不再依賴AQS來實現,而是通過一個volatile變量state以及CAS操作來實現。FutureTask結構如下所示:
我們來看一下 FutureTask 的代碼實現:
public class FutureTask implements RunnableFuture {...}
可以看到,它實現了一個接口,這個接口叫作 RunnableFuture。
RunnableFuture接口
我們來看一下 RunnableFuture 接口的代碼實現:
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口,而 FutureTask 又實現了 RunnableFuture 接口,所以 FutureTask 既可以作為 Runnable 被線程執行,又可以作為 Future 得到 Callable 的返回值。
FutureTask源碼分析
成員變量
/* * 當前任務運行狀態 * NEW -> COMPLETING -> NORMAL(正常結束,返回結果) * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結果) * NEW -> CANCELLED(任務被取消,無結果) * NEW -> INTERRUPTING -> INTERRUPTED(任務被打斷,無結果) */ private volatile int state; private static final int NEW = 0; // 新建 0 private static final int COMPLETING = 1; // 執行中 1 private static final int NORMAL = 2; // 正常 2 private static final int EXCEPTIONAL = 3; // 異常 3 private static final int CANCELLED = 4; // 取消 4 private static final int INTERRUPTING = 5; // 中斷中 5 private static final int INTERRUPTED = 6; // 被中斷 6 /** 將要被執行的任務 */ private Callable<V> callable; /** 存放執行結果,用于get()方法獲取結果,也可能用于get()方法拋出異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** 執行任務Callable的線程; */ private volatile Thread runner; /** 棧結構的等待隊列,該節點是棧中最頂層的節點 */ private volatile WaitNode waiters;
為了后面更好的分析FutureTask的實現,這里有必要解釋下各個狀態。
NEW :表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
COMPLETING :任務已經執行完成或者執行任務的時候發生異常,但是任務執行結果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務執行結果,如果發生異常,則用來保存異常原因)的時候,狀態會從NEW變更到COMPLETING。但是這個狀態會時間會比較短,屬于中間狀態。
NORMAL :任務已經執行完成并且任務執行結果已經保存到outcome字段,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。
EXCEPTIONAL :任務執行發生異常并且異常原因已經保存到outcome字段中后,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。
CANCELLED :任務還沒開始執行或者已經開始執行但是還沒有執行完成的時候,用戶調用了cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從NEW轉化為CANCELLED狀態。這是一個最終態。
INTERRUPTING :任務還沒開始執行或者已經執行但是還沒有執行完成的時候,用戶調用了cancel(true)方法取消任務并且要中斷任務執行線程但是還沒有中斷任務執行線程之前,狀態會從NEW轉化為INTERRUPTING。這是一個中間狀態。
INTERRUPTED :調用interrupt()中斷任務執行線程之后狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。
有一點需要注意的是,所有值大于COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異?;蛘呷蝿毡蝗∠?。
構造方法
// Callable 構造方法 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } // Runnable 構造方法 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
Runnable的構造器,只有一個目的,就是通過Executors.callable把入參轉化為RunnableAdapter,主要是因為Callable的功能比Runnable豐富,Callable有返回值,而Runnable沒有。
/** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
這是一個典型的適配模型,我們要把 Runnable 適配成 Callable,首先要實現 Callable 的接口,接著在 Callable 的 call 方法里面調用被適配對象(Runnable)的方法。
內部類
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
run方法
/** * run方法可以直接被調用 * 也可以開啟新的線程調用 */ public void run() { // 狀態不是任務創建,或者當前任務已經有線程在執行了,直接返回 if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { Callable<V> c = callable; // Callable 不為空,并且已經初始化完成 if (c != null && state == NEW) { V result; boolean ran; try { //調用執行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false;//執行失敗 //通過CAS算法設置返回值(COMPLETING)和狀態值(EXCEPTIONAL) setException(ex); } //執行成功通過CAS(UNSAFE)設置返回值(COMPLETING)和狀態值(NORMAL) if (ran) //將result賦值給outcome set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //將任務runner設置為null,避免發生并發調用run()方法 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts //須重新讀取任務狀態,避免不可達(泄漏)的中斷 int s = state; //確保cancle(ture)操作時,運行中的任務能接收到中斷指令 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
鴻蒙官方戰略合作共建——HarmonyOS技術社區
run方法是沒有返回值的,通過給outcome屬性賦值(set(result)),get時就能從outcome屬性中拿到返回值。
FutureTask 兩種構造器,最終都轉化成了 Callable,所以在 run 方法執行的時候,只需要執行 Callable 的 call 方法即可,在執行 c.call()代碼時,如果入參是 Runnable 的話, 調用路徑為 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入參是 Callable 的話,直接調用。
setException(Throwable t)方法
//發生異常時,將返回值設置到outcome(=COMPLETING)中,并更新任務狀態(EXCEPTIONAL) protected void setException(Throwable t) { //調用UNSAFE類封裝的CAS算法,設置值 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //喚醒因等待返回值而阻塞的線程 finishCompletion(); } }
由于篇幅有限,更多源碼解析請查看文章擴展鏈接
Future的使用
FutureTask可用于異步獲取執行結果或取消執行任務的場景。通過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執行,之后可以在外部通過FutureTask的get方法異步獲取執行結果,因此,FutureTask非常適合用于耗時的計算,主線程可以在完成自己的任務后,再去獲取結果。另外,FutureTask還可以確保即使調用了多次run方法,它都只會執行一次Runnable或者Callable任務,或者通過cancel取消FutureTask的執行等。
FutureTask執行多任務計算的使用場景
利用FutureTask和ExecutorService,可以用多線程的方式提交計算任務,主線程繼續執行其他任務,當主線程需要子線程的計算結果時,在異步獲取子線程的執行結果。
//任務正常完成,將返回值設置到outcome(=COMPLETING)中,并更新任務狀態(=NORMAL) protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
執行結果:
生成子線程計算任務: 0 生成子線程計算任務: 1 生成子線程計算任務: 2 生成子線程計算任務: 3 生成子線程計算任務: 4 生成子線程計算任務: 5 生成子線程計算任務: 6 生成子線程計算任務: 7 生成子線程計算任務: 8 生成子線程計算任務: 9 所有計算任務提交完畢, 主線程接著干其他事情! 子線程計算任務: 0 執行完成! 子線程計算任務: 1 執行完成! 子線程計算任務: 3 執行完成! 子線程計算任務: 4 執行完成! 子線程計算任務: 2 執行完成! 子線程計算任務: 5 執行完成! 子線程計算任務: 7 執行完成! 子線程計算任務: 9 執行完成! 子線程計算任務: 8 執行完成! 子線程計算任務: 6 執行完成! 多任務計算后的總結果是:990
FutureTask在高并發環境下確保任務只執行一次
在很多高并發的環境下,往往我們只需要某些任務只執行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設有一個帶key的連接池,當key存在時,即直接返回key對應的對象;當key不存在時,則創建連接。對于這樣的應用場景,通常采用的方法為使用一個Map對象來存儲key和連接池對應的對應關系,典型的代碼如下面所示:
//移除所有等待線程并發出信號,調用done(),以及將任務callable清空 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //循環喚醒阻塞線程,直到阻塞隊列為空 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; //一直到阻塞隊列為空,跳出循環 if (next == null) break; q.next = null; // unlink to help gc 方便gc在適當的時候回收 q = next; } break; } } done(); callable = null; // to reduce footprint }
在上面的例子中,我們通過加鎖確保高并發環境下的線程安全,也確保了connection只創建一次,然而卻犧牲了性能。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高。
package com.niuh.future; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.ConcurrentHashMap; /** * @description: 改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高。 * <p> * 但是在高并發的情況下有可能出現Connection被創建多次的現象。 * 為什么呢?因為創建Connection是一個耗時操作,假設多個線程涌入getConnection方法,都發現key對應的鍵不存在, * 于是所有涌入的線程都開始執行conn=createConnection(),只不過最終只有一個線程能將connection插入到map里。 * 但是這樣以來,其它線程創建的的connection就沒啥價值,浪費系統開銷。 */ public class FutureTaskConnection2 { private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>(); public static Connection getConnection(String key) { Connection connection = connectionPool.get(key); if (connection == null) { connection = createConnection(); //根據putIfAbsent的返回值判斷是否有線程搶先插入了 Connection returnConnection = connectionPool.putIfAbsent(key, connection); if (returnConnection != null) { connection = returnConnection; } } else { return connection; } return connection; } private static Connection createConnection() { try { return DriverManager.getConnection(""); } catch (SQLException e) { e.printStackTrace(); } return null; } }
但是在高并發的情況下有可能出現Connection被創建多次的現象。 為什么呢?
因為創建Connection是一個耗時操作,假設多個線程涌入getConnection方法,都發現key對應的鍵不存在,于是所有涌入的線程都開始執行conn=createConnection(),只不過最終只有一個線程能將connection插入到map里。但是這樣以來,其它線程創建的的connection就沒啥價值,浪費系統開銷。
這時最需要解決的問題就是當key不存在時,創建Connection的動作(conn=createConnection();)能放在connectionPool.putIfAbsent()之后執行,這正是FutureTask發揮作用的時機,基于ConcurrentHashMap和FutureTask的改造代碼如下:
package com.niuh.future; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * @description: FutureTask在高并發環境下確保任務只執行一次 * 這時最需要解決的問題就是當key不存在時,創建Connection的動作(conn=createConnection();) * 能放在connectionPool.putIfAbsent()之后執行,這正是FutureTask發揮作用的時機, * 基于ConcurrentHashMap和FutureTask的改造代碼如下: */ public class FutureTaskConnection3 { private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>(); public static Connection getConnection(String key) { FutureTask<Connection> connectionFutureTask = connectionPool.get(key); try { if (connectionFutureTask != null) { return connectionFutureTask.get(); } else { Callable<Connection> callable = new Callable<Connection>() { @Override public Connection call() throws Exception { return createConnection(); } }; FutureTask<Connection> newTask = new FutureTask<>(callable); FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask); if (returnFt == null) { connectionFutureTask = newTask; newTask.run(); } return connectionFutureTask.get(); } } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } private static Connection createConnection() { try { return DriverManager.getConnection(""); } catch (SQLException e) { e.printStackTrace(); } return null; } }
FutureTask任務執行完回調
FutureTask有一個方法 void done()會在每個線程執行完成return結果時回調。 假設現在需要實現每個線程完成任務執行后主動執行后續任務。
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. //自旋等待cancle(true)結束(中斷結束) if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
執行結果:
11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老板給我來一個月餅 11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 編號[804]月餅已打包好 11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 編號[88]月餅已打包好 11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 編號[166]月餅已打包好
“并發編程之如何理解Future&FutureTask”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。