這篇文章主要介紹“BlockingQueue接口及ArrayBlockingQueue實現類的方法”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“BlockingQueue接口及ArrayBlockingQueue實現類的方法”文章能幫助大家解決問題。
隊列是一種 FIFO
(先進先出)的數據結構,本文要講的 BlockingQueue
也是一種隊列,而且強調了線程安全的特性。
BlockingQueue
全稱:java.util.concurrent.BlockingQueue
。它是是一個線程安全的隊列接口,多個線程能夠以并發的方式從隊列中插入數據,取出數據的同時不會出現線程安全的問題。
BlockingQueue
通常用于消費者線程向隊列存入數據,消費者線程從隊列中取出數據,具體如下
生產者線程不停的向隊列中插入數據,直到隊列滿了,生產者線程被阻塞
消費者線程不停的從隊列中取出數據,直到隊列為空,消費者線程被阻塞
(推薦教程:Java教程)
BlockingQueue
提供 4 種不同類型的方法用于插入數,取出數據以及檢查數據,具體如下
操作失敗,拋出異常
無論成功/失敗,立即返回 true/false
如果隊列為空/滿,阻塞當前線程
如果隊列為空/滿,阻塞當前線程并有超時機制插入add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
取出remove(o)
poll()
take()
poll(timeout, timeunit)
檢查element()
peek()
BlockingQueue
只是一個接口,在實際開發中有如下的類實現了該接口。
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
這里以 BlockingQueue
接口的具體實現類 ArrayBlockingQueue
舉例。通過 ArrayBlockingQueue
實現一個消費者和生產者多線程模型。
核心內容如下:
以 ArrayBlockingQueue
作為生產者和消費者的數據容器
通過 ExecutorService
啟動 3 個線程,2 兩個生產者,1 個消費者
指定數據總量
ArrayBlockingQueueProducer
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 生產者線程向容器存入指定總量的 任務 * */ public class ArrayBlockingQueueProducer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueProducer.class); // 容器 private ArrayBlockingQueue<String> queue; // 生產指定的數量 private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueProducer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) { this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce; } @Override public void run() { try { while (numberOfElementsToProduce.get() > 0) { try { // 向隊列中存入任務 String task = String.format("task_%s", numberOfElementsToProduce.getAndUpdate(x -> x-1)); queue.put(task); logger.info("thread {}, produce task {}", Thread.currentThread().getName(), task); // 任務為0,生產者線程退出 if (numberOfElementsToProduce.get() == 0) { break; } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { logger.error(this.getClass().getName().concat(". has error"), e); } } }
ArrayBlockingQueueConsumer
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 消費者線程向容器 消費 指定總量的任務 * */ public class ArrayBlockingQueueConsumer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueConsumer.class); private ArrayBlockingQueue<String> queue; private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueConsumer(ArrayBlockingQueue<String> queue, AtomicInteger numberOfElementsToProduce) { this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce; } @Override public void run() { try { while (!queue.isEmpty() || numberOfElementsToProduce.get() >= 0) { // 從隊列中獲取任務,并執行任務 String task = queue.take(); logger.info("thread {} consume task {}", Thread.currentThread().getName(),task); // 隊列中數據為空,消費者線程退出 if (queue.isEmpty()) { break; } } } catch (Exception e) { logger.error(this.getClass().getName().concat(". has error"), e); } } }
測試TestBlockingQueue
import com.ckjava.synchronizeds.appCache.WaitUtils; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * 1. 以 ArrayBlockingQueue 作為生產者和消費者的數據容器 <br> * 2. 通過 ExecutorService 啟動 3 個線程,2 兩個生產者,1 個消費者 <br> * 3. 指定數據總量 */ public class TestBlockingQueue { public static void main(String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(10); /*BlockingQueue delayQueue = new DelayQueue(); BlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(10); BlockingQueue<String> priorityBlockingQueue = new PriorityBlockingQueue<>(10); BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();*/ ExecutorService executorService = Executors.newFixedThreadPool(3); // 最多生產 5 個數據 AtomicInteger numberOfElementsToProduce = new AtomicInteger(5); // 2 個生產者線程 executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce)); executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce)); // 1 個消費者線程 executorService.submit(new ArrayBlockingQueueConsumer(arrayBlockingQueue, numberOfElementsToProduce)); executorService.shutdown(); WaitUtils.waitUntil(() -> executorService.isTerminated(), 1000L); } }
輸出如下:
13:54:17.884 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_5 13:54:17.884 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_5 13:54:17.884 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_4 13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_4 13:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_2 13:54:17.887 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_3 13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_3 13:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_1 13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_2 13:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_1
關于“BlockingQueue接口及ArrayBlockingQueue實現類的方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。