溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java并發編程基本知識

發布時間:2020-07-23 01:44:05 來源:網絡 閱讀:257 作者:klloiy 欄目:編程語言

并發基礎

并發編程的原則

  1. 原子性

原子性是指在一個操作中就是cpu不可以在中途暫停然后再調度,既不被中斷操作,即一個操作或者多個操作 要么全部執行并且執行的過程不會被任何因素打斷,要么就都不執行。

  1. 可見性

對于可見性,Java提供了volatile關鍵字來保證可見性。當一個共享變量被volatile修飾時,它會保證修改的值會立即被更新到主存,當有其他線程需要讀取時,它會去內存中讀取新值。而普通的共享變量不能保證可見性,因為普通共享變量被修改之后,什么時候被寫入主存是不確定的,當其他線程去讀取時,此時內存中可能還是原來的舊值,因此無法保證可見性。另外,通過synchronized和Lock也能夠保證可見性,synchronized和Lock能保證同一時刻只有一個線程獲取鎖然后執行同步代碼,并且在釋放鎖之前會將對變量的修改刷新到主存當中。

  1. 有序性

在Java內存模型中,允許編譯器和處理器對指令進行重新排序,但是重新排序過程不會影響到單線程程序的執行,卻會影響到多線程并發執行的正確性。

Runnable和Thread
這里只說一下實現Runnable接口和繼承Thread類的區別:以賣10張票的任務為例,如果繼承Thread類的話,啟動三個線程就相當于開了三個窗口,每個窗口都有賣10張票的任務,各賣各的;如果實現Runnable接口的話,啟動三個線程相當開了三個窗口賣票,這三個窗口一共賣10張票。

synchronized關鍵字

1.?synchronized對象鎖

synchronized(this)和synchronized方法都是鎖當前對象,synchronized(obj)鎖臨界對象。使用synchronized的話最好是鎖臨界對象。如果想要使得任意多個線程任意多個用戶訪問的時候都不出任何問題,可以考慮一下用鎖當前對象的方法,因為鎖當前對象量級較重,所以一般不用。

如下面Sync類中的兩個方法test_01和test_02()鎖的都是程序創建的Sync對象,細粒度控制推薦用test_02()。

public synchronized void test_01() {
    System.out.println("鎖當前對象");
}
public void test_02() {
    synchronized (this) {
        System.out.println("鎖當前對象");
    }
}

下面這個方法鎖的是Sync對象中的object對象(即臨界對象)

public void test_03() {
    synchronized (object) {
        System.out.println("鎖臨界對象");
    }
}

2.?synchronized使用在靜態方法中鎖定當前類

靜態同步方法鎖的是當前類型的類對象,如在Sync類中的static test_04()方法上加了同步鎖synchronized,那么此時synchronized鎖的是Sync.class。

// 下面兩個方法都是靜態同步方法

public static synchronized void test_04() {
    System.out.println("鎖Sync.class");
}
public static void test_05() {
    synchronized (Sync.class)     {
        System.out.println("鎖Sync.class類");
    }
}

3.?synchronized作用于靜態和非靜態方法的區別

synchronized作用與非靜態方法,相當于鎖定單個對象,不同對象之間沒有競爭關系;而作用于靜態方法時,鎖加載類上,即鎖定class,這時相當于所有對象競爭同一把鎖。

  1. 同步代碼塊中拋出異常,鎖被釋放

如下例子,線程1會在i=5的時候拋出異常,此時線程1鎖被釋放,線程2開始調用方法。

public class Test {
    static class Test02 implements Runnable {
        private int i = 0;
        @Override
        public synchronized void run() {
            while (true) {
                System.out.println(Thread.currentThread().getName() + "_" + i++);
                if (i == 5) { // 當i==5時拋出異常,鎖被釋放
                    i = 1 / 0;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                }catch (InterruptedException ignored) { }
            }
        }
    }
public static void main(String[] args) {
    Test02 test02 = new Test02();
    new Thread(test02, "LQ").start();
    new Thread(test02, "WH").start();
}

}

  1. 實例分析

在下面代碼中,object被LQ鎖定,WH阻塞。

public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object){
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}

static class Test01 implements Runnable {
    @Override
    public void run() {
        new Test().m();
    }
}

static class Test02 implements Runnable {
    @Override
    public void run() {
        new Test().m();
    }
}

public static void main(String[] args) {
    Test01 test01 = new Test01();
    Thread thread = new Thread(test01, "LQ");
    thread.start();
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (Exception ignored) {}
    Test02 test02 = new Test02();
    thread = new Thread(test02, "WH");
    thread.start();
}

}
在WH線程中新創建了一個Object,WH正常運行。

public class Test {
    static Object object = new Object();
    void m() {
        System.out.println(Thread.currentThread().getName() + " start...");
        synchronized (object) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ignored){}
                System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
            }
        }
    }
    static class Test01 implements Runnable {
        @Override
        public void run() {
            new Test().m();
        }
    }
    static class Test02 implements Runnable {
        @Override
        public void run() {
            object = new Object();
            new Test().m();
        }
    }

    public static void main(String[] args) {
        Test01 test01 = new Test01();
        Thread thread = new Thread(test01, "LQ");
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception ignored) {}
        Test02 test02 = new Test02();
        thread = new Thread(test02, "WH");
        thread.start();
    }
}

上面代碼中,WH線程啟動后會一只處于等待狀態,因為object被LQ線程鎖著,但如果在WH線程中重新new Object()并賦值給object,這樣的話WH線程就能夠正常運行了,原因是:同步鎖鎖定的是對內存中的對象,所以LQ鎖定的是第一次new的對象而WH鎖定的是第二次new的對象,如下圖。

?

對于常量:String a = “aaa” 和String b = “aaa”是同一個對象,因此,假如A方法鎖定了a,B方法鎖定了b,啟動LQ線程調用A方法,然后啟動WH線程調用B方法,這樣的話WH線程會等到LQ線程結束后才執行。因此,在定義同步代碼塊時,不要使用常量作為鎖的目標對象。

volatile關鍵字
計算機中有CPU、內存和緩存,當CPU運行的時候,默認找緩存中的數據。當CPU有中斷的時候,根據操作系統對CPU的管理特性,可能會清空緩存,重新將內存中的數據讀到緩存中,也可能不清空緩存,仍舊使用緩存中的數據進行后續的計算。如果CPU不中斷的話,默認CPU只會找緩存數據。volatile這個關鍵字不是改變緩存數據特性的,而是直接改變內存中的數據特性,當對一個對象加了volatile關鍵字修飾的時候,相當于通知了底層OS操作系統,告訴CPU每次進行計算的時候最好去看一下內存數據是否發生了變更,這就是內存的可見性。volatile關鍵字就是為了保證內存的可見性。

如下代碼會發生死鎖現象。

public class Volatile01 {
    private static boolean b = true;
    private void m() {
        System.out.println("start...");
        while (b) {}
        System.out.println("end...");
    }
    static class Volatile_01 implements Runnable {
        @Override
        public void run() {
            new Volatile01().m();
        }
    }
    public static void main(String[] args) {
        Volatile_01 = new Volatile_01();
        new Thread(volatile_01).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException ignored) {}
        b = false;
    }
}

當將上述代碼塊中的共享變量b用volatile修飾時(保證了可見性),就能夠跳出循環了。

public class Volatile01 {
    private static volatile boolean b = true;
    private void m() {
        System.out.println("start...");
        while (b){}
        System.out.println("end...");
    }
    static class Volatile_01 implements Runnable {
        @Override
        public void run(){
            new Volatile01().m();
        }
    }
    public static void main(String[] args) {
        Volatile_01 = new Volatile_01();
        new Thread(volatile_01).start();
        try{
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException ignored){}
        b = false;
    }
}

join()方法
將多個線程連在一起,阻塞線程,直到調用join的線程執行完成。

如下程序打印的結果時100000,如果不用join()的話打印的結果將遠遠小于100000。用join()可以用來等待一組線程執行完畢后再進行后續邏輯處理,以保證數據的正確。

public class Test {
    private static volatile int count = 0;

    private void m() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }

    static class Test02 implements Runnable {
        @Override
        public synchronized void run() {
            new Test().m();
        }
    }

    public static void main(String[] args) {
        Test02 test02 = new Test02();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(test02));
        }
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(count);
    }
}

上述代碼中用了synchronized關鍵字來實現原子性,也可以不用synchronized而用AtomicInteger對象,因為AtomicInteger是一個原子性操作對象,代碼如下。

public class Test{
    private static AtomicInteger count = new AtomicInteger();
    private void m(){
        for (int i = 0; i < 10000; i++){
            count.incrementAndGet();
        }
    }
    static class Test02 implements Runnable{
        @Override
        public void run(){
            new Test().m();
        }
    }
    public static void main(String[] args){
        Test02 test02 = new Test02();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++){
            threads.add(new Thread(test02));
        }
        for (Thread thread : threads){
            thread.start();
            try{
                thread.join();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        System.out.println(count);
    }
}

CountDownLatch對象
CountDownLatch相當于一個門閂,在創建門閂對象的時候可以指定鎖的個數,若某個方法調用了門閂的await()方法,那么該方法執行到await()的時候會被阻塞等待門閂釋放,當門閂上沒有鎖也就是門閂開放的時候繼續執行。減門閂上鎖的方法時countDown()。

如下例,當在m1中調用了await(),在m2中調用了countDown(),因此根據m2的邏輯當m2執行完了之后門閂上的鎖數量就為0了,此時m1方法可以繼續執行了。

public class Test {
    private CountDownLatch countDownLatch = new CountDownLatch(5);

    private void m1() {
        try {
            countDownLatch.await(); // 等待門閂開放
        } catch (Exception ignored) {
        }
        System.out.println("method m1.");
    }

    private void m2() {
        while (countDownLatch.getCount() != 0) {
            countDownLatch.countDown(); // 減門閂上的鎖
            System.out.println("method m2");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    }
public static void main(String[] args) {
    Test count01 = new Test();
    new Thread(count01::m2).start();
    new Thread(count01::m1).start();
}

}

門閂可以和鎖混合使用,或替代鎖的功能,再門閂開放之前等待,當門閂完全開放之后執行,可避免鎖的效率低下問題。

wait()、notify()和notifyAll()
wait():在對象上調用wait(), 會使當前線程進入等待狀態, 直至另一個線程對這個對象調用了notify() 或notifyAll() 方法喚醒線程。

notify():喚醒對象正在等待的一個線程。

notifyAll():當調用對象的notifyAll()方法時,所有waiting狀態的線程都會被喚醒。

(生產者消費者)自定義同步容器,容器上限為10,可以在多線程中應用,并保證數據線程安全。

public class DeviceSingleton<E> {
    private DeviceSingleton() {
    }

    private final int max = 10;
    private int count = 0;
    private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();

    public static DeviceSingleton getInstance() {
        return DEVICE_SINGLETON;
    }

    private final List<E> devices = new ArrayList<>();

    /**
     * 添加
     */
    public synchronized void add(E data) {
        // 當容器滿了之后進入等待狀態
        while (devices.size() == max) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("add: " + data);
        ThreadUtils.sleep(1000);
        devices.add(data);
        count++;
        this.notify();
    }

    /**
     * 獲取
     */
    public synchronized E get() {
        E data = null;
        while (devices.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ThreadUtils.sleep(1000);
        data = devices.remove(0);
        count--;
        this.notifyAll();
        return data;
    }

    /**
     * 獲取長度
     */
    public synchronized int size() {
        return count;
    }

    @Data
    static class Device {
        private int id;
        private String name;

        public Device(int id, String name) {
            this.id = id;
            this.name = name;
        }
    }

    static class ThreadUtils {
        public static void sleep(int millis) {
            try {
                Thread.sleep(millis);
            } catch (Exception ignore) {}
        }
    }

}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
        for (int i = 0; i < 10; i++) {
            new Thread(() ->
            {
                for (int j = 0; j < 5; j++) {
                    System.out.println(deviceSingleton.get());
                }
            }, "consumer-" + i).start();
        }
        Thread.sleep(2000);

        for (int i = 0; i < 2; i++) {
            new Thread(() ->
            {
                for (int j = 0; j < 25; j++) {
                    deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
                }
            }, "producer").start();
        }
    }

}

ReentrantLock鎖

  1. 重入鎖

為盡量避免使用synchronized和同步方法出現的一種多線程鎖機制,建議使用的同步方式,效率比synchronized高。使用重入鎖時,需要手動釋放鎖(lock.unlock())。示例如下:

public class ReentrantLockTest {
    private final Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // 加鎖
        for (int i = 0; i < 10; i++) {
            System.out.println("method m1() " + i);
            ThreadUtils.sleep(1000);
        }
        lock.unlock(); // 解鎖
    }

    private void m2() {
        lock.lock(); // 加鎖
        System.out.println("method m2()");
        lock.unlock(); // 解鎖
    }

    public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(reentrantLockTest::m1).start();
        new Thread(reentrantLockTest::m2).start();
    }

}
  1. 嘗試鎖 lock.tryLock()

如果沒有獲取到鎖標記則返回false,當前線程等待,如果獲取到了鎖標記,則返回true,當前線程被鎖定執行。示例如下:

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // 加鎖
        for (int i = 0; i < 10; i++) {
            ThreadUtils.sleep(1000);
            System.out.println("method m1() " + i);
        }
        lock.unlock(); // 解鎖
    }
private void m2() {
        boolean isLocked = false;
        try {
                /*
                嘗試鎖,如果有鎖,則無法獲取鎖標記,返回false,否則返回true
                如果無法獲取到鎖標記,則說明別的線程正在使用鎖,該線程等待
                如果獲取到了鎖標記,則該線程的代碼塊被鎖定
                下面是獲取鎖標記的無參方法,當執行到該語句的時候立刻獲取鎖標記
                也可以用有參的,即當執行到該語句多長時間之內獲取鎖標記,如果超時,不等待,直接返回。如isLocked = lock.tryLock(5, TimeUnit.SECONDS);表示5秒之內獲取鎖標記(5秒之內任何時間獲取到鎖標記都會繼續執行),如果超時則直接返回。
                 */
                isLocked = lock.tryLock();
                System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
        } catch (Exception e) {
                e.printStackTrace();
        } finally {
                // 嘗試鎖在解除鎖標記的時候一定要判斷是否獲取到鎖標記
                if (isLocked) {
                        lock.unlock();
                }
        }
}
public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(reentrantLockTest::m1).start();
        new Thread(reentrantLockTest::m2).start();
}

}
  1. 可中斷鎖lock.lockInterruptibly()

非可中斷鎖當客戶端調用interrupt方法時,只是簡單的去設置interrupted中斷狀態,并沒有進一步拋出異常,而可中斷鎖在監測到中斷請求時會拋出InterruptedException ,進而中斷線程執行。示例如下:

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // 加鎖
        for (int i = 0; i < 5; i++) {
            ThreadUtils.sleep(1000);
            System.out.println("method m1() " + i);
        }
        lock.unlock(); // 解鎖
    }

    private void m2() {
        try {
            /*
            可打斷鎖,阻塞等待鎖,可以被其他的線程打斷阻塞狀態
             */
            lock.lockInterruptibly(); // 可嘗試打斷
            System.out.println("method m2()");
        } catch (InterruptedException e) {
            System.out.println("鎖被打斷");
        } finally {
            try {
                lock.unlock();
            } catch (Exception ignored) {
            }
        }
    }
public static void main(String[] args) {
    ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
    Thread thread1 = new Thread(reentrantLockTest::m1);
    thread1.start();
    ThreadUtils.sleep(1000);
    Thread thread2 = new Thread(reentrantLockTest::m2);
    thread2.start();
    ThreadUtils.sleep(1000);
    thread2.interrupt(); // 打斷線程休眠
}

}
注意:用ReentrantLock打斷鎖,如果要打斷的話是用線程打斷,跟喚醒不同,notifyAll喚醒是用對象區喚醒。(打斷thread.interruped(); 喚醒object.notifyAll())。

線程打斷有什么用呢?

我們在用Windows的時候經常會遇到軟件鎖死的問題,這時候我們往往會通過打開任務管理器來結束進程,這種結束進程可以認為是打斷鎖的阻塞狀態(即非正常結束)。

  1. 公平鎖

先到先得。若沒有特殊情況,不建議使用公平鎖,如果使用公平鎖的話,一般來說并發量<=10,如果并發量較大,而不可避免的有訪問先后順序的話,建議采用別的方法。

public class ReentrantLockTest {
    static class TestReentrantLock extends Thread {
        // 在創建ReentrantLock對象的時候傳參為true就代表創建公平鎖
        private ReentrantLock lock = new ReentrantLock(true);

        public void run() {
            for (int i = 0; i < 5; i++) {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " get lock.");
                    ThreadUtils.sleep(1000);
                } finally {
                    lock.unlock();
                }
            }
        }
    }
public static void main(String[] args) {
    TestReentrantLock lock = new TestReentrantLock();
    lock.start();
    new Thread(lock).start();
    new Thread(lock).start();
}

}

  1. Condition

為Lock增加條件,當條件滿足時做一些事情,如加鎖或解鎖、等待或喚醒等。下面示例就是使用Condition實現的生產者消費者。

public class DeviceContainer<T> {
private DeviceContainer() {
}

private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();

public static DeviceContainer getInstance() {
    return DEVICE_CONTAINER;
}

private final List<T> list = new LinkedList<>();

private final int max = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void add(T t) {
    lock.lock();
    try {
        while (this.size() == max) {
            System.out.println(Thread.currentThread().getName() + " 等待");
            // 當數據長度為max的時候,生產者進入等待隊列,釋放鎖標記
            // 借助條件進入的等待隊列
            producer.await();
        }
        System.out.println(Thread.currentThread().getName() + " 添加");
        list.add(t);
        count++;
        // 借助條件喚醒所有的消費者
        consumer.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

public T get() {
    T t = null;
    lock.lock();
    try {
        while (this.size() == 0) {
            System.out.println(Thread.currentThread().getName() + " 等待");
            // 借助條件使消費者進入等待隊列
            consumer.await();
        }
        System.out.println(Thread.currentThread().getName() + " 獲取");
        t = list.remove(0);
        count--;
        // 借助條件喚醒所有生產者
        producer.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    return t;
}

private int size() {
    return count;
}

}

public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer<Device> deviceSingleton = DeviceContainer.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}

}

}

Java中的同步容器

  1. Map/Set

ConcurrentHashMap/ConcurrentHashSet:底層哈希實現的Map/Set,效率高,使用底層技術實現的線程安全,量級較synchronized輕。key和value不能為null(不同于HashMap和HashSet)

ConcurrentSkipListMap/ConcurrentSkipListSet:底層跳表實現的Map/Set,有序,線程安全,效率較ConcurrentHashMap/ConcurrentHashSet低。

CopyOnWriteArraySet:底層數組,線程安全,增加和刪除效率低,查詢效率高。

  1. List

CopyOnWriteArrayList:底層數組,線程安全,增加和刪除效率低,查詢效率高。

  1. Queue

ConcurrentLinkedQueue/ ConcurrentLinkedDeue:基礎鏈表同步隊列,非阻塞,ConcurrentLinkedQueue底層單向鏈表,ConcurrentLinkedDeue底層雙向鏈表,均***。

ArrayBlockingQueue/LinkedBlockingQueue:阻塞隊列,隊列容量不足自動阻塞,隊列容量為0自動阻塞。ArrayBlockingQueue底層使用數組,有界;LinkedBlockingQueue底層使用鏈表,默認***。ArrayBlockingQueue根據調用API的不同,有不同的特性。當容量不足的時候有阻塞能力。add方法在容量不足的時候會拋出異常;put方法在容量不足時阻塞等待;offer默認不阻塞,當容量不足的時候返回false,否則返回true;三參offer可設定阻塞時長,若在阻塞時長內有容量空閑,則添加并返回true,如果阻塞時長范圍內無容量空閑,放棄新增數據并返回false。LinkedBlockingQueue的add方法在容量不足的時候會拋出異常;offer方法在容量不足時返回false,否則返回true;三參offer可設定阻塞時長,若在阻塞時長內有容量空閑,則添加并返回true,如果阻塞時長范圍內無容量空閑,放棄新增數據并返回false。

PriorityQueue:有限集隊列,底層數組,***。

PriorityBlockingQueue:優先級阻塞隊列,底層數組,***。

LinkedTransferQueue:轉移隊列,使用transfer方法實現數據的即時處理。隊列使用add保存數據,不做阻塞等待。transfer是TransferQueue的特有方法,轉移隊列必須要有消費者(take()方法的調用者)。如果沒有任何線程消費數據,則transfer方法阻塞。一般用于處理即時消息。

SynchronousQueue:阻塞的同步隊列,有界。是一個容量為0的隊列,是一個特殊的TransferQuque。必須先有消費線程等待才能使用的隊列。add方法無阻塞,若沒有消費線程阻塞等待數據,則拋出異常。put方法有阻塞,若沒有消費線程阻塞等待數據,則put方法阻塞。

DelayQueue:延時阻塞隊列,***。類似輪詢機制,一般用來做定時任務。業務場景舉例:具有過期時間的緩存,訂單過期自動取消等。

?

線程池
線程池是一個進程級的資源,默認的生命周期和JVM一致,即從開啟線程池開始,到JVM關閉為止,是線程池的默認生命周期。如果顯式調用shutdown方法,那么線程池執行所有的任務后自動關閉。

Executor接口
線程池頂級接口。Executor中只有一個方法execute,是用來處理任務的一個服務方法。調用者提供Runnable接口的實現,線程池通過執行線程執行這個Runnable。

public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {@Override
br/>@Override
new Thread(command).start();
}
}
}
ExecutorService
Executor的子接口,與Executor不同的是,它還提供了一個返回值為Future的服務方法submit。

Executors工具類
Executor的工具類,為線程池提供工具方法,可快速創建線程池,所有的線程池類型都實現了這個接口,實現了這個接口就代表有提供線程池的能力。常用方法有:void execute(),Future submit(Callable),Future submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。

public class Test {
public static void main(String[] args) throws InterruptedException {
// 創建一個長度為5的線程池對象
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);

    // 優雅關閉
    executorService.shutdown();
    // 是否已經結束,相當于判斷是否回收了資源,因為線程睡眠,此時還未回收,因此為false
    System.out.println(executorService.isTerminated());
    // 是否已經關閉,即是否調用過shutdown方法
    System.out.println(executorService.isShutdown());
    System.out.println(executorService);

    ThreadUtils.sleep(1000);

    // 因為上面睡了5秒,任務都已經執行完了,資源也被回收了,因此為true
    System.out.println(executorService.isTerminated());
    System.out.println(executorService.isShutdown());
    System.out.println(executorService);
}

}
Future
未來結果,代表線程執行結束后的結果。通過get方法獲取線程執行結果。

常用方法:get()、get(long, TimeUnit)和isDown()。

get():阻塞等待線程執行結束并得到返回結果;

get(long, TimeUnit):阻塞固定時長,等待線程結束后的結果,如果在阻塞時長范圍內線程未執行結束,拋出異常。

isDown():判斷線程是否結束即判斷call方法是否已完成,要特別注意,這里的isDown與ExecutorService中的isShutdown不同,isShutdown是用來判斷線程是否關閉的。

public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}

private static void testExecutorService() throws ExecutionException, InterruptedException {
    ExecutorService service = Executors.newFixedThreadPool(1);
    Future<String> future = service.submit(() -> {
        ThreadUtils.sleep(1000);
        return Thread.currentThread().getName() + " submit.";
    });

    // 查看任務是否完成即線程是否結束即call方法是否執行結束,
    // 要注意的是,這里判斷是否結束,跟ExecutorService中的isShutDowm不同, isShutdowm是判斷線程是否結束,而shutdown表示關閉線程
    System.out.println(future.isDone());
    // 獲取call方法的返回值
    System.out.println(future.get()); // false

    System.out.println(future.isDone());
    System.out.println(future.get()); // true

    // 關閉線程池
    service.shutdown();
}

}
Callable接口
可執行接口。類似Runnable接口,也是可以啟動線程的接口。

接口方法:call(),相當于Runnable中的run方法,區別在于call方法有返回值。

Callable和Runnable的選擇:當需要返回值或需要拋出異常時,使用Callable,其他情況任意選。

ThreadPoolExecutor創建線程池
通過new ThreadPoolExecutor來創建,下圖是ThreadPoolExecutor的三個構造方法:

參數說明:

corePoolSize? 核心線程數

maximumPoolSize? 最大線程數

keepAliveTime? 線程最大空閑時間

unitTimeUnit? 時間單位

workQueueBlockingQueue<Runnable>? 線程等待隊列

threadFactoryThreadFactory? 線程創建工廠

handlerRejectedExecutionHandler? 拒絕策略
?

核心線程數和最大線程數:

當提交一個新任務到線程池時首先判斷核心線程數corePoolSize是否已達上限,若未達到corePoolSize上限,創建一個工作線程來執行任務;否則,再判斷線程池工作隊列workQueueBlockingQueue是否已滿,若沒滿,則將新提交的任務存儲在工作隊列里;否則,線程池將判斷最大線程數是否已達上限,若未達到maximumPoolSize上限,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。如果線程池中的線程數量大于核心線程數 corePoolSize 時,線程空閑時間超過線程最大空閑時間keepAliveTime,則線程將被終止,直至線程池中的線程數目不大于corePoolSize。

自定義線程池

public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}

private static void testExecutorThreadPool() {
    // 創建線程池,核心線程數為2,最大線程數為4,最大空閑時間為10
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
            4,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2),
            new MyTreadFactory(),
            new MyIgnorePolicy());
    // 啟動所有核心線程,使其出與等待狀態
    executor.prestartAllCoreThreads();

    // 創建并執行任務
    for (int i = 1; i <= 10; i++) {
        MyTask task = new MyTask(String.valueOf(i));
        executor.execute(task);
    }
}

static class MyTreadFactory implements ThreadFactory {

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable runnable) {
        Thread t = new Thread(runnable, "線程【" + mThreadNum.getAndIncrement() + "】");
        System.out.println(t.getName() + " 已創建");
        return t;
    }
}

public static class MyIgnorePolicy implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        doLog(runnable, executor);
    }

    private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
        System.err.println(runnable.toString() + " 被拒絕");
    }
}

@Data
static class MyTask implements Runnable {
    private String name;

    public MyTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(this.toString() + " 正在運行");
        ThreadUtils.sleep(1000);
    }

    @Override
    public String toString() {
        return "線程【" + name + "】";
    }
}

}
FixedThreadPool線程池
固定容量的線程池,可由Executors來創建,活動狀態和線程池容量是有上限的,需要手動銷毀線程池。構造方法如下:

由此可見,該線程池核心線程數和最大線程數均為構造參數值nThreads,線程最大空閑時間為0,任務隊列采用LinkedBlockingQueue,默認容量上限是Integer.MAX_VALUE。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創建容量為10的FixedThreadPool線程池
    ExecutorService service = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 100; i++) {
        service.execute(()-> System.out.println(Thread.currentThread().getName()));
    }
    // 銷毀線程池
    service.shutdown();
}

}
CachedThreadPool線程池
緩存線程池,通過Executors來創建,默認最大容量為Integer.MAX_VALUE,自動擴容,執行完后自動銷毀(這一點與FixedThreadPool不同,FixedThreadPool的銷毀需要手動調用shutdown方法)。構造方法如下:

由構造方法可見,核心線程數為0,最大線程數為Integer.MAX_VALUE,最大空閑時間為60秒,任務隊列使用SynchronousQueue。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創建緩存線程池
    ExecutorService service = Executors.newCachedThreadPool();
    System.out.println(service);
    for (int i = 0; i < 5; i++) {
        service.execute(() -> {
            ThreadUtils.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " executor.");
        });
    }
    System.out.println(service);
    ThreadUtils.sleep(65);
    System.out.println(service);
}

}
ScheduledThreadPool線程池
計劃任務線程池,可以根據任務自動執行計劃的線程池,由Executors創建,需要手動銷毀。計劃任務時選用,如需要定時整理數據、服務器定期清除無效文件等。構造方法如下:

核心線程數為構造參數大小,最大線程數為Integer.MAX_VALUE,最大空閑時間0,任務隊列使用DelayedWorkQuquq。

常用方法有:scheduledAtFixedRate、schedule、execute等。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創建計劃任務線程池
    ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
    System.out.println(service);
    // 定期任務,線程池啟動500毫秒后第一次執行任務,以后每300毫秒執行一次
    service.scheduleAtFixedRate(() -> {
        ThreadUtils.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " executor.");
    }, 500, 300, TimeUnit.MILLISECONDS);
    System.out.println(service);
    service.shutdown();
}

}
SingleThreadExecutor線程池
單一容量的線程池。需要手動銷毀。有保證任務順序需求時可選用。如大廳中的公共頻道聊天,固定數量商品的秒殺等。構造方法如下:

核心線程數和最大線程數均為1,任務隊列為LinkedBlockingQueue。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創建單一容量線程池
    ExecutorService service = Executors.newSingleThreadExecutor();
    System.out.println(service);
    for (int i = 0; i < 5; i++) {
        service.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " executor.");
            ThreadUtils.sleep(1000);
        });
    }
    service.shutdown();
}

}
ForkJoinPool線程池
分支合并線程池,適用于處理復雜任務。初始化線程容量與CPU核心數有關。

ForkJoinPool沒有所謂的容量,默認都是一個線程,根據任務自動分支新的子線程,,當子線程結束后自動合并。所謂自動合并,是用fork和join兩個方法實現的(手動調用)。

線程池中運行的可分治合并的任務必須是ForkJoinTask的子類型(RecursiveTask或RecursiveAction,二者的區別在于一個運行完之后有返回值,一個沒有),其中提供了分支和合并能力。

ForkJoinTask提供了兩個抽象子類型RecursiveTask和RecursiveAction,RecursiveTask是有返回結果的分支合并任務,RecursiveAction是無返回結果的分支合并任務(類似Callable和Runnable的區別)。

ForkJoinTask提供了一個compute方法,這個方法里面就是任務的執行邏輯。

該線程池主要用于大量數據的計算、數據分析等。

public class Test {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long result = 0L;
    for (int NUMBER : NUMBERS) {
        result += NUMBER;
    }
    System.out.println(result);

    ForkJoinPool pool = new ForkJoinPool();
    // 分支合并任務
    AddTask task = new AddTask(0, NUMBERS.length);
    // 提交任務
    Future<Long> future = pool.submit(task);
    System.out.println(future.get());
}

private static final int[] NUMBERS = new int[1000000];
private static final int MAX_SIZE = 50000;
private static final Random RANDOM = new Random();

static {
    for (int i = 0; i < NUMBERS.length; i++) {
        NUMBERS[i] = RANDOM.nextInt(1000);
    }
}

static class AddTask extends RecursiveTask<Long> {
    int begin, end;

    AddTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if ((end - begin) < MAX_SIZE) {
            long sum = 0L;
            for (int i = begin; i < end; i++) {
                sum += NUMBERS[i];
            }
            return sum;
        } else {
            // 當結束值減去開始值大于臨界值的時候進行分支
            int middle = begin + (end - begin) / 2;
            AddTask task1 = new AddTask(begin, middle);
            AddTask task2 = new AddTask(middle, end);
            // 分支的工作,就是開啟一個新的線程任務
            task1.fork();
            task2.fork();
            // join就是合并,將任務的結果獲取,是一個阻塞方法,一定會得到結果數據
            return task1.join() + task2.join();
        }
    }
}

}

線程組

一組線程的集合,線程組中多個線程執行同一批任務,線程之間是隔離的,互不影響。同一組的線程之間可以通信,但不同組的線程之間不能通信,這樣就做到了線程屏蔽,保證了線程安全。

public class Test {

public static void main(String[] args) {
    new Test().test();
}

public void test() {
    ThreadGroup group = new ThreadGroup("LQ");
    Thread thread = new Thread(group, () ->
            System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
    );
    thread.start();
}

}
朋友們覺得內容有什么錯誤、不足之處,或者有什么疑問,盡可留言指出來,一起學習哦。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女