在多線程編程中,線程通信是一個非常重要的概念。線程通信指的是多個線程之間通過某種機制來交換信息或協調工作。Java提供了多種方式來實現線程通信,本文將介紹幾種常見的方法。
wait()
和notify()
/notifyAll()
wait()
、notify()
和notifyAll()
是Java中用于線程通信的基本方法。這些方法必須在同步塊或同步方法中使用,因為它們依賴于對象的監視器鎖。
wait()
wait()
方法使當前線程進入等待狀態,直到其他線程調用notify()
或notifyAll()
方法喚醒它。調用wait()
后,當前線程會釋放對象的鎖。
synchronized (obj) {
while (conditionIsNotMet) {
obj.wait();
}
// 執行操作
}
notify()
和notifyAll()
notify()
方法喚醒在此對象監視器上等待的單個線程,而notifyAll()
方法喚醒所有等待的線程。
synchronized (obj) {
// 改變條件
obj.notify(); // 或者 obj.notifyAll();
}
class SharedResource {
private boolean flag = false;
public synchronized void produce() {
while (flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = true;
System.out.println("Produced");
notify();
}
public synchronized void consume() {
while (!flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = false;
System.out.println("Consumed");
notify();
}
}
public class WaitNotifyExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.produce();
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.consume();
}
});
producer.start();
consumer.start();
}
}
BlockingQueue
BlockingQueue
是Java并發包中的一個接口,它提供了線程安全的隊列操作。BlockingQueue
可以用于實現生產者-消費者模式,從而簡化線程通信。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
int value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
}
}
Lock
和Condition
Lock
和Condition
是Java 5引入的并發工具,提供了比synchronized
更靈活的線程通信機制。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class SharedResource {
private boolean flag = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void produce() {
lock.lock();
try {
while (flag) {
condition.await();
}
flag = true;
System.out.println("Produced");
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (!flag) {
condition.await();
}
flag = false;
System.out.println("Consumed");
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class LockConditionExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.produce();
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.consume();
}
});
producer.start();
consumer.start();
}
}
Semaphore
Semaphore
是一種計數信號量,用于控制對共享資源的訪問。它可以用于實現線程間的通信和同步。
import java.util.concurrent.Semaphore;
class SharedResource {
private Semaphore semaphore = new Semaphore(1);
public void produce() {
try {
semaphore.acquire();
System.out.println("Produced");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
public void consume() {
try {
semaphore.acquire();
System.out.println("Consumed");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public class SemaphoreExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.produce();
}
});
Thread consumer = new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.consume();
}
});
producer.start();
consumer.start();
}
}
CountDownLatch
CountDownLatch
是一種同步工具,允許一個或多個線程等待其他線程完成操作。
import java.util.concurrent.CountDownLatch;
class Worker implements Runnable {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println("Worker is working");
latch.countDown();
}
}
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Thread worker1 = new Thread(new Worker(latch));
Thread worker2 = new Thread(new Worker(latch));
Thread worker3 = new Thread(new Worker(latch));
worker1.start();
worker2.start();
worker3.start();
latch.await();
System.out.println("All workers have finished");
}
}
CyclicBarrier
CyclicBarrier
是一種同步工具,允許一組線程互相等待,直到所有線程都到達某個屏障點。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Task is running");
barrier.await();
System.out.println("Task has finished");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierExample {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All tasks have reached the barrier");
});
Thread task1 = new Thread(new Task(barrier));
Thread task2 = new Thread(new Task(barrier));
Thread task3 = new Thread(new Task(barrier));
task1.start();
task2.start();
task3.start();
}
}
Exchanger
Exchanger
是一種同步工具,允許兩個線程在某個點交換數據。
import java.util.concurrent.Exchanger;
class Producer implements Runnable {
private Exchanger<String> exchanger;
public Producer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String data = "Produced Data";
System.out.println("Producer is sending: " + data);
String receivedData = exchanger.exchange(data);
System.out.println("Producer received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private Exchanger<String> exchanger;
public Consumer(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String data = "Consumed Data";
System.out.println("Consumer is sending: " + data);
String receivedData = exchanger.exchange(data);
System.out.println("Consumer received: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread producer = new Thread(new Producer(exchanger));
Thread consumer = new Thread(new Consumer(exchanger));
producer.start();
consumer.start();
}
}
Phaser
Phaser
是一種靈活的同步工具,允許線程分階段地執行任務。
import java.util.concurrent.Phaser;
class Task implements Runnable {
private Phaser phaser;
public Task(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println("Task is running");
phaser.arriveAndAwaitAdvance();
System.out.println("Task has finished");
}
}
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
Thread task1 = new Thread(new Task(phaser));
Thread task2 = new Thread(new Task(phaser));
Thread task3 = new Thread(new Task(phaser));
task1.start();
task2.start();
task3.start();
}
}
Future
和Callable
Future
和Callable
是Java中用于異步計算的工具。Callable
可以返回一個結果,而Future
用于獲取該結果。
import java.util.concurrent.*;
class Task implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "Task Result";
}
}
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Task());
System.out.println("Waiting for result...");
String result = future.get();
System.out.println("Result: " + result);
executor.shutdown();
}
}
CompletableFuture
CompletableFuture
是Java 8引入的異步編程工具,提供了更強大的功能和更靈活的線程通信機制。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task Result";
});
System.out.println("Waiting for result...");
String result = future.get();
System.out.println("Result: " + result);
}
}
Java提供了多種方式來實現線程通信,每種方式都有其適用的場景。wait()
和notify()
是最基本的線程通信機制,適用于簡單的同步場景。BlockingQueue
、Lock
和Condition
、Semaphore
、CountDownLatch
、CyclicBarrier
、Exchanger
、Phaser
、Future
和Callable
、CompletableFuture
等工具則提供了更高級的線程通信和同步功能,適用于更復雜的并發場景。
在實際開發中,選擇合適的線程通信機制可以大大提高程序的性能和可維護性。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。