這篇文章主要介紹“java并發包的介紹以及線程池的創建和使用”,在日常操作中,相信很多人在java并發包的介紹以及線程池的創建和使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java并發包的介紹以及線程池的創建和使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
1.java并發包介紹
JDK5.0(JDK1.5更名后)以后的版本引入高級并發特性,大多數的特性在java.util.concurrent包中,是專門用于多線程編程的,充分利用了現代多處理器和多核心系統的功能以編寫大規模并發應用程序。主要包括原子量、并發集合、同步器、可重入鎖,并對線程池的構造提供了強力的支持
2.線程池
java.util.concurrent.Executors提供了一個 java.util.concurrent.Executor接口的實現用于創建線程池
多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。
假設服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。如果T1 + T3 遠大于 T2,則可以采用線程池,以提高服務器性能,減少創建和銷毀線程所需消耗的時間。
一個線程池由以下四個基本部分組成:
線程池管理器(ThreadPool):用于創建并管理線程池,包括 創建線程池,銷毀線程池,添加新任務;
工作線程(PoolWorker):線程池中線程,在沒有任務時處于等待狀態,可以循環的執行任務;
任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;
任務隊列(taskQueue):用于存放沒有處理的任務。提供一種緩沖機制。
線程池技術正是關心如何縮短或調整T1,T3時間從而提高服務器程序性能的技術。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,免去了線程創建和銷毀的開銷。
線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:
假設一個服務器一天要處理100000個請求,并且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,
一般線程池大小是遠小于100000。所以利用線程池的服務器程序不會為了創建100000而在處理請求時浪費時間,從而提高效率。
線程池的五種創建方式
Single Thread Executor:只有一個線程的線程池,因此所提交的任務是順序執行,Executors.newSingleThreadExecutor();
Cached Thread Pool:線程池里有很多線程需同時進行,舊的可用線程將被新的任務觸發從而重新執行,如果線程超過60秒內沒有執行,那么將被終止并從池中刪除Executors.newCachedThreadPool();
Fixed Thread Pool:擁有固定線程數的線程池,如果沒有任務執行,那么線程會一直等待,Executors.newFixedThreadPool(10);在構造函數中的參數10是線程池的大小,你可以隨意設置,也可以和cpu的數量保持一致,獲取cpu的數量int cpuNums = Runtime.getRuntime().availableProcessors();
Scheduled Thread Pool:用來調度即將執行的任務的線程池Executors.newScheduledThreadPool();
Sing Thread Scheduled Pool:只有一個線程,用來調度任務在指定時間執行Executors.newSingleThreadScheduledExecutor();
3.線程池的使用
以下用Fixed Thread Pool作為示范,提供一個使用參考
LogNumVo
package com.ithzk.threadpool;
/**
* 用作返回 執行的數量的
* @author hzk
* @date 2018/3/29
*/
public class LogNumVo {
private static final long serialVersionUID = -5541722936350755569L;
private Integer dataNum;
private Integer successNum;
private Integer waitNum;
public Integer getDataNum() {
return dataNum;
}
public void setDataNum(Integer dataNum) {
this.dataNum = dataNum;
}
public Integer getSuccessNum() {
return successNum;
}
public void setSuccessNum(Integer successNum) {
this.successNum = successNum;
}
public Integer getWaitNum() {
return waitNum;
}
public void setWaitNum(Integer waitNum) {
this.waitNum = waitNum;
}
}DealObject
package com.ithzk.threadpool;
/**
* @author hzk
* @date 2018/3/29
*/
public class DealObject {
private Integer identifyId;
private String data;
public DealObject(Integer identifyId, String data) {
this.identifyId = identifyId;
this.data = data;
}
public DealObject() {
}
public Integer getIdentifyId() {
return identifyId;
}
public void setIdentifyId(Integer identifyId) {
this.identifyId = identifyId;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}AbstractCalculateThread
package com.ithzk.threadpool;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
/**
* @author hzk
* @date 2018/3/29
*/
public class AbstractCalculateThread<T> implements Callable<String> {
protected Collection<T> insertList;
protected CountDownLatch countd;
protected String threadCode;
protected String batchNumber;
public Collection<T> getInsertList() {
return insertList;
}
public void setInsertList(Collection<T> insertList) {
this.insertList = insertList;
}
public CountDownLatch getCountd() {
return countd;
}
public void setCountd(CountDownLatch countd) {
this.countd = countd;
}
public String getThreadCode() {
return threadCode;
}
public void setThreadCode(String threadCode) {
this.threadCode = threadCode;
}
public String getBatchNumber() {
return batchNumber;
}
public void setBatchNumber(String batchNumber) {
this.batchNumber = batchNumber;
}
public AbstractCalculateThread() {
super();
}
public AbstractCalculateThread(Collection<T> insertList, CountDownLatch countd, String threadCode,String batchNumber) {
super();
this.insertList = insertList;
this.countd = countd;
this.threadCode = threadCode;
this.batchNumber = batchNumber;
}
public String call() throws Exception {
return null;
}
}CalculateDealThread
package com.ithzk.threadpool;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
/**
* @author hzk
* @date 2018/3/29
*/
public class CalculateDealThread extends AbstractCalculateThread<DealObject> {
private ExecutorPool executorPool = SpringContextUtil.getBean(ExecutorPool.class);
@Override
public String call() throws Exception {
try {
System.out.println("========開始跑線程【"+threadCode+"】");
return executorPool.syncBatchDealObject(insertList,batchNumber);
} catch (Exception e) {
e.printStackTrace();
System.out.println("========開始跑線程【"+threadCode+"】:"+e.getMessage());
}finally {
countd.countDown();
}
return null;
}
public CalculateDealThread() {
super();
}
public CalculateDealThread(Collection<DealObject> insertList, CountDownLatch countd, String threadCode,String batchNumber) {
super(insertList, countd, threadCode, batchNumber);
}
}ExecutorPool
package com.ithzk.threadpool;
import java.util.*;
import java.util.concurrent.*;
/**
* @author hzk
* @date 2018/3/29
*/
public class ExecutorPool {
/**
* 模擬需要處理數據的大小
*/
private static final int ARRAY_COUNT = 50000;
/**
* 開啟多線程處理的條件
*/
private static final int MULTI_THREAD_STARTCOUNT = 10000;
/**
* 批量處理的大小
*/
private static final int BATCH_DEAL_SIZE = 100;
/**
* 每次開啟線程數量
*/
public static final int THREAD_POOL_NUM=10;
public static void main(String[] args){
testExecutorPool();
}
public static void testExecutorPool(){
ArrayList<DealObject> dealObjects = new ArrayList<DealObject>();
for (int i = 0;i<ARRAY_COUNT;i++){
DealObject dealObject = new DealObject(i,"data_"+i);
dealObjects.add(dealObject);
System.out.println("Data add success current:"+i);
}
int size = dealObjects.size();
int successNum = 0;
int waitNum = 0;
System.out.println("需要處理的數據數據量為:"+size);
// 判斷數據是否大于10000 如果大于則開啟線程池 跑數據
if (size > MULTI_THREAD_STARTCOUNT) {
try {
System.out.println("===================dataNum > 1000 | Multiple Thread Run=======================");
// 每次新增處理的條數
int batchInsertSize = BATCH_DEAL_SIZE;
// 定義保存的線程池
ExecutorService executorInsert = Executors.newFixedThreadPool(THREAD_POOL_NUM);
// 定義保存過程中返回的線程執行返回參數
List<Future<String>> futureListIsert = new ArrayList<Future<String>>();
// 線程 修改list
List<Map<Integer, DealObject>> listDealObjects = new ArrayList<Map<Integer, DealObject>>();
List<Map<Integer, DealObject>> listLiveSyncLogInsert = pointDateClassify(dealObjects, batchInsertSize, listDealObjects);
if (null != listLiveSyncLogInsert && !listDealObjects.isEmpty()) {
System.out.println("===================切割后的大小:"+listLiveSyncLogInsert.size()+"=======================");
//配合使用CountDownLatch為了保證在執行完所有子程序之后再執行主程序
CountDownLatch countd = new CountDownLatch(listLiveSyncLogInsert.size());
for (int j = 0; j < listLiveSyncLogInsert.size(); j++) {
Map<Integer, DealObject> insert = listLiveSyncLogInsert.get(j);
Future<String> future = executorInsert.submit(new CalculateDealThread(insert.values(), countd,"executor_pool_test_thread", null));
futureListIsert.add(future);
}
}
// 等待線程執行完成
executorInsert.shutdown();
for (Future<String> future : futureListIsert) {
String json = future.get();
if (null != json && !"".equals(json)) {
將返回的json格式數據轉換為實體類 進行業務記錄
LogNumVo logNumVo = JSON.toJavaObject(JSON.parseObject(json),LogNumVo.class);
successNum += logNumVo.getSuccessNum();
waitNum += logNumVo.getWaitNum();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
/**
* 拆分線程數
* 假設集合中有50000個元素 則按照100個一組切分 可切分為500組
* 即每個線程一次處理一組(100個元素)
*
* @author
* @param lPostUploadIntegralList
* @param batchInsertSize
* @param listPostUploadIsert
*/
@SuppressWarnings("all")
public static List<Map<Integer, DealObject>> pointDateClassify(List<DealObject> lPostUploadIntegralList,int batchInsertSize, List<Map<Integer, DealObject>> listJSONObjectUpdate) {
List<Map<Integer, DealObject>> listLiveSyncLogInsert = new Vector<Map<Integer, DealObject>>();
// 新增數據list
List<DealObject> integralListInsert = lPostUploadIntegralList;
System.out.println("============integralListInsert.size()=====:" + integralListInsert.size());
// 拆分數據(拆成多個List)
int inserti = 0;
if (integralListInsert != null && integralListInsert.size() > 0) {
ConcurrentHashMap<Integer, DealObject> integralListIns = null;
for (int l = 0; l < integralListInsert.size(); l++) {
if (integralListIns == null) {
integralListIns = new ConcurrentHashMap<Integer, DealObject>();
}
integralListIns.put(integralListInsert.get(l).getIdentifyId(), integralListInsert.get(l));
inserti++;
if ((inserti % batchInsertSize) == 0) {
listLiveSyncLogInsert.add(integralListIns);
integralListIns = null;
} else {
// 最后100條或不足100條數據
if ((l + 1) == integralListInsert.size()) {
listLiveSyncLogInsert.add(integralListIns);
}
}
}
}
System.out.println("=============listPostUploadInsert.size()====:" + listLiveSyncLogInsert.size());
return listLiveSyncLogInsert;
}
/**
* 多線程保存數據至數據庫
*/
public String syncBatchDealObject(Collection<DealObject> insertList,String batchNumber) {
int successNum = 0, waitNum = 0;
Date currentDate = new Date(System.currentTimeMillis());
for (DealObject dealObject : insertList) {
try {
int icount = syncDealObject(dealObject,currentDate);
if(icount > 0){
successNum ++;
}else {
waitNum ++;
}
} catch (Exception e) {
e.printStackTrace();
++waitNum;
}
}
LogNumVo logNum = new LogNumVo();
logNum.setDataNum(0);
logNum.setSuccessNum(successNum);
logNum.setWaitNum(waitNum);
// 將記錄實體類轉為json格式反饋給線程池
return JSON.toJSONString(logNum);
}
/**
* 處理數據業務
* @param dealObject
* @param currentDate
* @return
*/
private int syncDealObject(DealObject dealObject,Date currentDate){
int successNum = 0;
//業務處理邏輯
if(null != dealObject.getData()){
successNum++;
}
return successNum;
}
}4.BlockingQueue
BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。主要的方法是:put、take一對阻塞存??;add、poll一對非阻塞存取。
插入:
add(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常
offer(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則返回false.
put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續.
讀?。?/strong>
poll(time)
取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null
take()
取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止
其他:
int remainingCapacity()
返回理想情況下(沒有內存和資源約束)此隊列可接受并且不會被阻塞的附加元素數量。
該數量總是等于此隊列的初始容量,小于隊列的當前 size(返回隊列剩余的容量)。
注意,不能總是通過檢查 remainingcapacity 來斷定試圖插入一個元素是否成功,因為可能是另一個線程將插入或移除某個元
素。
boolean remove(Object o)
從隊列移除元素,如果存在,即移除一個或者更多,隊列改變了返回true
public boolean contains(Object o)
查看隊列是否存在這個元素,存在返回true
int drainTo(Collection<? super E> c)
傳入的集合中的元素,如果在隊列中存在,那么將隊列中的元素移動到集合中
int drainTo(Collection<? super E> c, int maxElements)
和上面方法的區別在于,制定了移動的數量
以下是一個BlockQueue的基本使用參考:
Producer
package com.ithzk.BlockingQueueTest;
import java.util.concurrent.BlockingQueue;
/**
* @author hzk
* @date 2018/3/31
*/
public class Producer implements Runnable{
BlockingQueue<String> blockingQueue;
public Producer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
String threadIdentify = "A Producer,生產線程"+Thread.currentThread().getName();
blockingQueue.put(threadIdentify);
System.out.println("Produce success! Thread:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}Consumer
package com.ithzk.BlockingQueueTest;
import java.util.concurrent.BlockingQueue;
/**
* @author hzk
* @date 2018/3/31
*/
public class Consumer implements Runnable{
BlockingQueue<String> blockingQueue;
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
String consumer = Thread.currentThread().getName();
System.out.println("Current Consumer Thread:"+consumer);
//如果隊列為空會阻塞當前線程
String take = blockingQueue.take();
System.out.println(consumer + " consumer get a product:"+take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}BlockTest
package com.ithzk.BlockingQueueTest;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author hzk
* @date 2018/3/31
*/
public class BlockTest {
public static void main(String[] args) throws InterruptedException {
// 不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE
// BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
// BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2);
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(2);
Consumer consumer = new Consumer(blockingQueue);
Producer producer = new Producer(blockingQueue);
for (int i = 0; i < 3; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(consumer, "Consumer" + (i + 1)).start();
}
Thread.sleep(5000);
new Thread(producer, "Producer" + (5)).start();
}
}BlockingQueue有四個具體的實現類,常用的兩種實現類為:
ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制。
若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
LinkedBlockingQueue和ArrayBlockingQueue區別
LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大于ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低于ArrayBlockingQueue.
到此,關于“java并發包的介紹以及線程池的創建和使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。