Concurrent:
1. BlockingQueue( 阻塞隊列 )
ArrayBlockingQueue( 指定容量,不可變 ),LinkeBlocingQueue (指定容量不可變,也可以不指定容量,默認 Integer.Max_value )
PriorityBlockingQueue ( 根據實現的接口自定義排序,只有在逐個拿取的時候才有序 )
SynchronousQueue (長度以 1 只能為 1 )
2. ConcurrentMap
ConcurrentHashMap
1.5 分桶加鎖,1.8 CAS+紅黑樹來保證線程安全
ConcurrentNavigableMap (針對有序列表,有 map.headMap map.subMap map.tailMap , 返回有序的在取出范圍的 map )
3. CountDownLatch 閉鎖
CountDownLatch 以一個給定的數量初始化。 countDown() 每被調用一次,這一數 量就減一。通過調用 await() 方法之一,線程可以阻塞等待這一數量到達零。
等待一定數量的線程完成。來執行其后的程序
4. CyclicBarrier 柵欄
它能夠對處理一些算法的線程實現同
步。換句話講,它就是一個所有線程必須等待的一個柵欄,直到所有線程都到達這
里,然后所有線程才可以繼續做其他事情
5. Exchanger 交換機
類表示一種兩個線程可以進行互相交換對象的會和點 ,只能兩個線程之間交換數據
6. Semaphore 信號量
l
acquire()
l
release()
計數信號量由一個指定數量的
"
許可
"
初始化。每調用一次
acquire()
,一個許可會
被調用線程取走。每調用一次
release()
,一個許可會被返還給信號量。因此,在沒
有任何
release()
調用時,最多有
N
個線程能夠通過
acquire()
方法,
N
是該信
號量初始化時的許可的指定數量。
線程池
7. ExecutorService
(executors.newFixedThreadPool ,長任務場景,只有核心線程,沒有臨時線程,容納無限多
,newCachedThreadPool (高并發短任務場景,沒有核心線程,全部都是臨時線程,處理任意多的線程)
,newSingleThreadPool
,newSchedulerThreadPool (有核心線程,有臨時線程)
)
提交線程的方法
execute() 提交線程 沒有返回值submit(Runnable) 提交線程,返Future 可以通過Future.get()得到該線程的狀態但是如果該線程未執行完成,那么該方法阻塞
submit(Callable) 同上面類似,但是線程可以帶有返回值
invokeAny(.....),隨機選擇線程執行一個
invokeAll(),自動執行所有的線程
關閉線程池: ExecutorService.shutdown();該方法不再接受線程池,等待所有線程執行完畢后,線程池結束。
ExecutorService.shutdown();立即關閉,退出任務,正在執行的線程可能會出錯。
##Callable只能用線程池提交
Callable runnable :
1. 返回值
2. 異常, runnabel沒有容錯機制,callable有容錯機制,可以將 異常拋給上層處理
3. Callable只能通過submit方法提交,runnable可以new 也可 以通過線程池提交
8. ReadWriteLock 讀寫鎖 ,可以是公平,也可以是非公平的。該鎖可以跨方法
讀鎖可以共享,寫鎖互斥
讀鎖 readLock().lock();
寫鎖 writeLock().lock();
package hgs.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test { public static void main(String[] args) throws InterruptedException, BrokenBarrierException { //ConcurrentMap //BlockingQueue<String> bq = new ArrayBlockingQueue<String>(4); //add(超過長度會報錯) remove(沒有元素會報錯) /* bq.add("1"); bq.add("1"); bq.add("1"); bq.add("1"); bq.remove(); bq.remove(); bq.remove(); bq.remove(); */ //offer(如果可以插入返回true 否則 false) poll(如果沒有元素返回null) //bq.offer("1"); //bq.offer("1"); //bq.offer("1"); //bq.offer("1"); //bq.offer("1"); //String flag = bq.poll(); //System.out.println(flag); //put take 阻塞式 //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.take(); //offer(o,timeout,timeunit),poll(timeout,tameunit) 等待timeout時間,然后跳過 //bq.poll(10, TimeUnit.SECONDS); //bq.element();//檢查是否為空,是的話跑出異常 //bq.peek();//阻塞 /* ConcurrentNavigableMap<Integer ,String> m = new ConcurrentSkipListMap<Integer, String>() ; m.put(1, "1"); m.put(2, "2"); m.put(5, "5"); m.put(4, "4"); m.put(6, "6"); m.put(5, "5"); System.out.println("head(\"5\")"+m.headMap(4)); System.out.println(); System.out.println();*/ /*Boy b1 = new Boy("b1",24); Boy b2 = new Boy("b2",23); Boy b3 = new Boy("b3",26); Boy b4 = new Boy("b4",19); PriorityBlockingQueue< Boy> pbq = new PriorityBlockingQueue<Boy>(100); pbq.add(b1); pbq.add(b2); pbq.add(b3); pbq.add(b4); for(int i =0 ;i<4;i++) { System.out.println(pbq.take().toString()); } */ //閉鎖 /*CountDownLatch cdl = new CountDownLatch(4); for(int i = 0; i<4; i++) { new Thread(new BoyRun(cdl)).start(); } cdl.await(); System.out.println("全部到達。。。");*/ //柵欄 /*CyclicBarrier cb = new CyclicBarrier(4); for(int i = 0; i<4; i++) { new Thread(new GirlRan(cb)).start(); } cb.await(); System.out.println("all comming.");*/ //exchanger交換器 /*Exchanger<String> ex = new Exchanger<String>(); ExchangerTest e1 = new ExchangerTest(ex); ExchangerTest e2 = new ExchangerTest(ex); new Thread(e1).start(); new Thread(e2).start(); */ //6.Semaphore 信號量 /* Semaphore s = new Semaphore(5); for(int i = 0 ;i<9;i++) { new SemaphoreTest(s).start(); }*/ //原始創建線程池,executors.newCacheThreadPool 的底層調用該方法 /* ExecutorService es = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("full......"); } }); for(int i = 0 ;i<24;i++) { es.execute(new ThreadPoolTest()); } es.shutdown();*/ //可重入鎖 ReentrantLock 可重入讀寫鎖ReentrantReadWriteLock /* ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); for (int i = 0 ;i<3;i++) { new ReadLockTest( rwLock ).start(); }*/ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); for (int i = 0 ;i<3;i++) { new WriteLockTest( rwLock ).start(); } } } class WriteLockTest extends Thread{ ReentrantReadWriteLock rwLock ; public WriteLockTest(ReentrantReadWriteLock rwLock ) { this.rwLock = rwLock; } @Override public void run() { rwLock.writeLock().lock(); System.out.println("reading......"); try { Thread.sleep((long)(Math.random()*2000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("done......"); rwLock.writeLock().unlock(); } } class ReadLockTest extends Thread{ ReentrantReadWriteLock rwLock ; public ReadLockTest(ReentrantReadWriteLock rwLock ) { this.rwLock = rwLock; } @Override public void run() { rwLock.readLock().lock(); System.out.println("reading......"); try { Thread.sleep((long)(Math.random()*2000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("done......"); rwLock.readLock().unlock(); } } class ThreadPoolTest implements Runnable{ ThreadPoolTest (){ } @Override public void run() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep((long)(Math.random()*2000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } class SemaphoreTest extends Thread{ Semaphore s = null; public SemaphoreTest(Semaphore s) { this.s = s; } @Override public void run() { try { s.acquire(); System.out.println("aquire......"); Thread.sleep((long)(Math.random()*3000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("release......"); s.release(); } } class ExchangerTest implements Runnable{ Exchanger<String> ex = null; public ExchangerTest(Exchanger<String> ex) { this.ex = ex; } @Override public void run() { String my = Thread.currentThread().getName(); String exstr = null; try { exstr = ex.exchange(my); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(my+" "+exstr); } } class BoyRun implements Runnable{ CountDownLatch cdl ; public BoyRun(CountDownLatch cdl ) { this.cdl = cdl; } @Override public void run() { System.out.println("cdl"+" +1"); cdl.countDown(); System.out.println("cdl"+" +1--"); } } class GirlRan implements Runnable { CyclicBarrier cb ; public GirlRan(CyclicBarrier cb ) { this.cb = cb; } @Override public void run() { System.out.println("cdl"+" +1"); try { Thread.sleep((long)(Math.random()*5000)); cb.await(); } catch (InterruptedException | BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("cdl"+" +1--"); } } class Boy implements Comparable<Boy>{ String name; int age; public Boy(String name ,int age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int compareTo(Boy o) { return this.age - o.age; } @Override public String toString() { return "Boy [name=" + name + ", age=" + age + "]"; } }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。