溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

LongAdder的實現原理是什么

發布時間:2021-08-09 15:17:46 來源:億速云 閱讀:186 作者:Leah 欄目:web開發

LongAdder的實現原理是什么,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

AtomicLong的實現原理圖:

LongAdder的實現原理是什么

LongAdder是JDK8新增的原子操作類,它提供了一種新的思路,既然AtomicLong的性能瓶頸是由于大量線程同時更新一個變量造成的,那么能不能把這個變量拆分出來,變成多個變量,然后讓線程去競爭這些變量,最后合并即可?LongAdder的設計精髓就在這里,通過將變量拆分成多個元素,降低該變量的并發度,最后進行合并元素,變相的減少了CAS的失敗次數。

LongAdder的實現原理圖:

LongAdder的實現原理是什么

常用方法

public class LongAdder extends Striped64 implements Serializable {     //構造方法     public LongAdder() {     }     //加1操作     public void increment();     //減1操作     public void decrement();     //獲取原子變量的值     public long longValue();  }

下面給出一個簡單的例子,模擬50線程同時進行更新

package com.xue.testLongAdder;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAdder;  public class Main {     public static void main(String[] args) {         LongAdder adder = new LongAdder();         ExecutorService threadPool = Executors.newFixedThreadPool(20);         for (int i = 0; i < 50; i++) {             Runnable r = () -> {                 adder.add(1);             };             threadPool.execute(r);         }         threadPool.shutdown();         //若關閉線程池后,所有任務執行完畢,則isTerminated()返回true         while (!threadPool.isTerminated()) {             System.out.println(adder.longValue());             break;         }     } }

輸出結果是50

其中,如果對線程池不熟悉的同學,可以先參考我的另外一篇文章說說線程池

原理解析

類圖

LongAdder的實現原理是什么

LongAdder內部維護了一個Cell類型的數組,其中Cell是Striped64中的一個靜態內部類。

Cell類

abstract class Striped64 extends Number {        @sun.misc.Contended static final class Cell {         volatile long value;         Cell(long x) { value = x; }         final boolean cas(long cmp, long val) {             return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);         }     }  }

Cell用來封裝被拆分出來的元素,內部用一個value字段保存當前元素的值,等到需要合并時,則累加所有Cell數組中的value。Cell內部使用CAS操作來更新value值,對CAS操作不熟悉的同學,可以參考我的另外一篇文章淺探CAS實現原理

可以注意到,Cell類被 @sun.misc.Contended注解修飾了,這個注解是為了解決偽共享問題的,什么是偽共享?

  • 一個緩存行可以存儲多個變量(存滿當前緩存行的字節數);而CPU對緩存的修改又是以緩存行為最小單位的,在多線程情況下,如果需要修改“共享同一個緩存行的變量”,就會無意中影響彼此的性能,這就是偽共享(False  Sharing)。

對偽共享還不理解的同學,可以參考這位大佬的文章偽共享(False Sharing)底層原理及其解決方式

而LongAdder采用的是Cell數組,而數組元素是連續的,因此多個Cell對象共享一個緩存行的情況非常普遍,因此這里@sun.misc.Contended注解對單個Cell元素進行字節填充,確保一個Cell對象占據一個緩存行,即填充至64字節。

關于如何確定一個對象的大小,可以參考我的另外一篇文章對象的內存布局,怎樣確定對象的大小,這樣可以算出來,還需要填充多少字節。

longValue()

longValue()返回累加后的值

public long longValue() {      return sum();  }   public long sum() {      Cell[] as = cells; Cell a;      long sum = base;      //當Cell數組不為null時,進行累加后返回,否則直接返回基準數base      if (as != null) {          for (int i = 0; i < as.length; ++i) {              if ((a = as[i]) != null)                  sum += a.value;          }      }      return sum;  }

這可能是LongAdder中最簡單的方法了,就不進行贅述了。什么,你要看復雜的?好的,這就來了。

increment()

public void increment() {        add(1L);     }   public void add(long x) {      Cell[] as; long b, v; int m; Cell a;      /**       * 如果一下兩種條件則繼續執行if內的語句       * 1. cells數組不為null(不存在爭用的時候,cells數組一定為null,一旦對base的cas操作失敗,       * 才會初始化cells數組)       * 2. 如果cells數組為null,如果casBase執行成功,則直接返回,如果casBase方法執行失敗       * (casBase失敗,說明第一次爭用沖突產生,需要對cells數組初始化)進入if內;       * casBase方法很簡單,就是通過UNSAFE類的cas設置成員變量base的值為base+要累加的值       * casBase執行成功的前提是無競爭,這時候cells數組還沒有用到為null,可見在無競爭的情況下是       * 類似于AtomticInteger處理方式,使用cas做累加。       */      if ((as = cells) != null || !casBase(b = base, b + x)) {          //uncontended判斷cells數組中,當前線程要做cas累加操作的某個元素是否#不#存在爭用,          //如果cas失敗則存在爭用;uncontended=false代表存在爭用,uncontended=true代表不存在爭用。          boolean uncontended = true;          /**           *1. as == null : cells數組未被初始化,成立則直接進入if執行cell初始化           *2. (m = as.length - 1) < 0: cells數組的長度為0           *條件1與2都代表cells數組沒有被初始化成功,初始化成功的cells數組長度為2;           *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的長度不為0,           * 則通過getProbe方法獲取當前線程Thread的threadLocalRandomProbe變量的值,初始為0,           * 然后執行threadLocalRandomProbe&(cells.length-1 ),相當于m%cells.length;           * 如果cells[threadLocalRandomProbe%cells.length]的位置為null,           * 這說明這個位置從來沒有線程做過累加,           * 需要進入if繼續執行,在這個位置創建一個新的Cell對象;           *4. !(uncontended = a.cas(v = a.value, v + x)):           * 嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作,           * 并返回操作結果,如果失敗了則進入if,重新計算一個threadLocalRandomProbe;           如果進入if語句執行longAccumulate方法,有三種情況           1. 前兩個條件代表cells沒有初始化,           2. 第三個條件指當前線程hash到的cells數組中的位置還沒有其它線程做過累加操作,           3. 第四個條件代表產生了沖突,uncontended=false           **/          if (as == null || (m = as.length - 1) < 0 ||                  (a = as[getProbe() & m]) == null ||                  !(uncontended = a.cas(v = a.value, v + x)))              longAccumulate(x, null, uncontended);      }  }

其中longAccumulate()的解析如下:

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {     //獲取當前線程的threadLocalRandomProbe值作為hash值,如果當前線程的threadLocalRandomProbe為0,     // 說明當前線程是第一次進入該方法,則強制設置線程的threadLocalRandomProbe為ThreadLocalRandom類的成員     // 靜態私有變量probeGenerator的值,后面會詳細將hash值的生成;     //另外需要注意,如果threadLocalRandomProbe=0,代表新的線程開始參與cell爭用的情況     //1.當前線程之前還沒有參與過cells爭用(也許cells數組還沒初始化,進到當前方法來就是為了初始化cells數組     //后爭用的),     // 是第一次執行base的cas累加操作失??;     //2.或者是在執行add方法時,對cells某個位置的Cell的cas操作第一次失敗,則將wasUncontended設置為false,     // 那么這里會將其重新置為true;第一次執行操作失??;     //凡是參與了cell爭用操作的線程threadLocalRandomProbe都不為0;     int h;     if ((h = getProbe()) == 0) {         //初始化ThreadLocalRandom;         ThreadLocalRandom.current(); // force initialization         //將h設置為0x9e3779b9         h = getProbe();         //設置未競爭標記為true         wasUncontended = true;     }     //cas沖突標志,表示當前線程hash到的Cells數組的位置,做cas累加操作時與其它線程發生了沖突,cas失??;     // collide=true代表有沖突,collide=false代表無沖突     boolean collide = false;     for (;;) {         Cell[] as; Cell a; int n; long v;         //這個主干if有三個分支         //1.主分支一:處理cells數組已經正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4)         //2.主分支二:處理cells數組沒有初始化或者長度為0的情況;(這個分支處理add方法的四個條件中的1和2)         //3.主分支三:處理如果cell數組沒有初始化,并且其它線程正在執行對cells數組初始化的操作,         // 及cellbusy=1;         // 則嘗試將累加值通過cas累加到base上         //先看主分支一         if ((as = cells) != null && (n = as.length) > 0) {             /**              *內部小分支一:這個是處理add方法內部if分支的條件3:如果被hash到的位置為null,              * 說明沒有線程在這個位置設置過值,              * 沒有競爭,可以直接使用,則用x值作為初始值創建一個新的Cell對象,              * 對cells數組使用cellsBusy加鎖,              * 然后將這個Cell對象放到cells[m%cells.length]位置上              */             if ((a = as[(n - 1) & h]) == null) {                 //cellsBusy == 0 代表當前沒有線程cells數組做修改                 if (cellsBusy == 0) {                     //將要累加的x值作為初始值創建一個新的Cell對象,                     Cell r = new Cell(x);                     //如果cellsBusy=0無鎖,則通過cas將cellsBusy設置為1加鎖                     if (cellsBusy == 0 && casCellsBusy()) {                         //標記Cell是否創建成功并放入到cells數組被hash的位置上                         boolean created = false;                         try {                             Cell[] rs; int m, j;                             //再次檢查cells數組不為null,且長度不為空,且hash到的位置的Cell為null                             if ((rs = cells) != null &&                                     (m = rs.length) > 0 &&                                     rs[j = (m - 1) & h] == null) {                                 //將新的cell設置到該位置                                 rs[j] = r;                                 created = true;                             }                         } finally {                             //去掉鎖                             cellsBusy = 0;                         }                         //生成成功,跳出循環                         if (created)                             break;                         //如果created為false,說明上面指定的cells數組的位置cells[m%cells.length]                         // 已經有其它線程設置了cell了,                         // 繼續執行循環。                         continue;                     }                 }                 //如果執行的當前行,代表cellsBusy=1,有線程正在更改cells數組,代表產生了沖突,將collide設置為false                 collide = false;                  /**                  *內部小分支二:如果add方法中條件4的通過cas設置cells[m%cells.length]位置的Cell對象中的                  * value值設置為v+x失敗,                  * 說明已經發生競爭,將wasUncontended設置為true,跳出內部的if判斷,                  * 最后重新計算一個新的probe,然后重新執行循環;                  */             } else if (!wasUncontended)                 //設置未競爭標志位true,繼續執行,后面會算一個新的probe值,然后重新執行循環。                 wasUncontended = true;             /**              *內部小分支三:新的爭用線程參與爭用的情況:處理剛進入當前方法時threadLocalRandomProbe=0的情況,              * 也就是當前線程第一次參與cell爭用的cas失敗,這里會嘗試將x值加到cells[m%cells.length]              * 的value ,如果成功直接退出              */             else if (a.cas(v = a.value, ((fn == null) ? v + x :                     fn.applyAsLong(v, x))))                 break;             /**              *內部小分支四:分支3處理新的線程爭用執行失敗了,這時如果cells數組的長度已經到了最大值              * (大于等于cup數量),              * 或者是當前cells已經做了擴容,則將collide設置為false,后面重新計算prob的值*/             else if (n >= NCPU || cells != as)                 collide = false;             /**              *內部小分支五:如果發生了沖突collide=false,則設置其為true;會在最后重新計算hash值后,              * 進入下一次for循環              */             else if (!collide)                 //設置沖突標志,表示發生了沖突,需要再次生成hash,重試。                 // 如果下次重試任然走到了改分支此時collide=true,!collide條件不成立,則走后一個分支                 collide = true;             /**              *內部小分支六:擴容cells數組,新參與cell爭用的線程兩次均失敗,且符合庫容條件,會執行該分支              */             else if (cellsBusy == 0 && casCellsBusy()) {                 try {                     //檢查cells是否已經被擴容                     if (cells == as) {      // Expand table unless stale                         Cell[] rs = new Cell[n << 1];                         for (int i = 0; i < n; ++i)                             rs[i] = as[i];                         cells = rs;                     }                 } finally {                     cellsBusy = 0;                 }                 collide = false;                 continue;                   // Retry with expanded table             }             //為當前線程重新計算hash值             h = advanceProbe(h);              //這個大的分支處理add方法中的條件1與條件2成立的情況,如果cell表還未初始化或者長度為0,             // 先嘗試獲取cellsBusy鎖。         }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {             boolean init = false;             try {  // Initialize table                 //初始化cells數組,初始容量為2,并將x值通過hash&1,放到0個或第1個位置上                 if (cells == as) {                     Cell[] rs = new Cell[2];                     rs[h & 1] = new Cell(x);                     cells = rs;                     init = true;                 }             } finally {                 //解鎖                 cellsBusy = 0;             }             //如果init為true說明初始化成功,跳出循環             if (init)                 break;         }         /**          *如果以上操作都失敗了,則嘗試將值累加到base上;          */         else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))              // Fall back on using base             break;     } }

看完上述內容,你們掌握LongAdder的實現原理是什么的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女