本篇內容介紹了“simpread golang與select case的實現機制是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
當一個 goroutine 要從一個 non-nil & non-closed chan 上接收數據時,goroutine 首先會去獲取 chan 上的鎖,然后執行如下操作直到某個條件被滿足:
1)如果 chan 上的 value buffer 不空,這也意味著 chan 上的 recv goroutine queue 也一定是空的,該接收 goroutine 將從 value buffer 中 unshift 出一個 value。這個時候,如果 send goroutine 隊列不空的情況下,因為剛才 value buffer 中空出了一個位置,有位置可寫,所以這個時候會從 send goroutine queue 中 unshift 出一個發送 goroutine 并讓其恢復執行,讓其執行把數據寫入 chan 的操作,實際上是恢復該發送該 goroutine 執行,并把該發送 goroutine 要發送的數據 push 到 value buffer 中。然后呢,該接收 goroutine 也拿到了數據了,就繼續執行。這種情景,channel 的接收操作稱為 non-blocking 操作。
2)另一種情況,如果 value buffer 是空的,但是 send goroutine queue 不空,這種情況下,該 chan 一定是 unbufferred chan,不然 value buffer 肯定有數據嘛,這個時候接收 goroutine 將從 send goroutine queue 中 unshift 出一個發送 goroutine,并將該發送 goroutine 要發送的數據接收過來(兩個 goroutine 一個有發送數據地址,一個有接收數據地址,拷貝過來就 ok),然后這個取出的發送 goroutine 將恢復執行,這個接收 goroutine 也可以繼續執行。這種情況下,chan 接收操作也是 non-blocking 操作。
3)另一種情況,如果 value buffer 和 send goroutine queue 都是空的,沒有數據可接收,將把該接收 goroutine push 到 chan 的 recv goroutine queue,該接收 goroutine 將轉入 blocking 狀態,什么時候恢復期執行呢,要等到有一個 goroutine 嘗試向 chan 發送數據的時候了。這種場景下,chan 接收操作是 blocking 操作。
當一個 goroutine 常識向一個 non-nil & non-closed chan 發送數據的時候,該 goroutine 將先嘗試獲取 chan 上的鎖,然后執行如下操作直到滿足其中一種情況。
1)如果 chan 的 recv goroutine queue 不空,這種情況下,value buffer 一定是空的。發送 goroutine 將從 recv goroutine queue 中 unshift 出一個 recv goroutine,然后直接將自己要發送的數據拷貝到該 recv goroutine 的接收地址處,然后恢復該 recv goroutine 的運行,當前發送 goroutine 也繼續執行。這種情況下,chan send 操作是 non-blocking 操作。
2)如果 chan 的 recv goroutine queue 是空的,并且 value buffer 不滿,這種情況下,send goroutine queue 一定是空的,因為 value buffer 不滿發送 goroutine 可以發送完成不可能會阻塞。該發送 goroutine 將要發送的數據 push 到 value buffer 中然后繼續執行。這種情況下,chan send 操作是 non-blocking 操作。
3)如果 chan 的 recv goroutine queue 是空的,并且 value buffer 是滿的,發送 goroutine 將被 push 到 send goroutine queue 中進入阻塞狀態。等到有其他 goroutine 嘗試從 chan 接收數據的時候才能將其喚醒恢復執行。這種情況下,chan send 操作是 blocking 操作。
當一個 goroutine 嘗試 close 一個 non-nil & non-closed chan 的時候,close 操作將依次執行如下操作。
1)如果 chan 的 recv goroutine queue 不空,這種情況下 value buffer 一定是空的,因為如果 value buffer 如果不空,一定會繼續 unshift recv goroutine queue 中的 goroutine 接收數據,直到 value buffer 為空(這里可以看下 chan send 操作,chan send 寫入數據之前,一定會從 recv goroutine queue 中 unshift 出一個 recv goroutine)。recv goroutine queue 里面所有的 goroutine 將一個個 unshift 出來并返回一個 val=0 值和 sentBeforeClosed=false。
2)如果 chan 的 send goroutine queue 不空,所有的 goroutine 將被依次取出并生成一個 panic for closing a close chan。在這 close 之前發送到 chan 的數據仍然在 chan 的 value buffer 中存著。
一旦 chan 被關閉了,chan recv 操作就永遠也不會阻塞,chan 的 value buffer 中在 close 之前寫入的數據仍然存在。一旦 value buffer 中 close 之前寫入的數據都被取出之后,后續的接收操作將會返回 val=0 和 sentBeforeClosed=true。
理解這里的 goroutine 的 blocking、non-blocking 操作對于理解針對 chan 的 select-case 操作是很有幫助的。下面介紹 select-case 實現機制。
select-case 中假如沒有 default 分支的話,一定要等到某個 case 分支滿足條件然后將對應的 goroutine 喚醒恢復執行才可以繼續執行,否則代碼就會阻塞在這里,即將當前 goroutine push 到各個 case 分支對應的 ch 的 recv 或者 send goroutine queue 中,對同一個 chan 也可能將當前 goroutine 同時 push 到 recv、send goroutine queue 這兩個隊列中。
不管是普通的 chan send、recv 操作,還是 select chan send、recv 操作,因為 chan 操作阻塞的 goroutine 都是依靠其他 goroutine 對 chan 的 send、recv 操作來喚醒的。前面我們已經講過了 goroutine 被喚醒的時機,這里還要再細分一下。
chan 的 send、recv goroutine queue 中存儲的其實是一個結構體指針 * sudog,成員 gp * g 指向對應的 goroutine,elem unsafe.Pointer 指向待讀寫的變量地址,c * hchan 指向 goroutine 阻塞在哪個 chan 上,isSelect 為 true 表示 select chan send、recv,反之表示 chan send、recv。g.selectDone 表示 select 操作是否處理完成,即是否有某個 case 分支已經成立。
下面我們先描述下 chan 上某個 goroutine 被喚醒時的處理邏輯,假如現在有個 goroutine 因為 select chan 操作阻塞在了 ch2、ch3 上,那么會創建對應的 sudog 對象,并將對應的指針 * sudog push 到各個 case 分支對應的 ch2、ch3 上的 send、recv goroutine queue 中,等待其他協程執行 (select) chan send、recv 操作時將其喚醒: 1)源碼文件 chan.go,假如現在有另外一個 goroutine 對 ch2 進行了操作,然后對 ch2 的 goroutine 執行 unshift 操作取出一個阻塞的 goroutine,在 unshift 時要執行方法 **func (q *waitq) dequeue() sudog,這個方法從 ch2 的等待隊列中返回一個阻塞的 goroutine。
func (q *waitq) dequeue() *sudog { for { sgp := q.first if sgp == nil { return nil } y := sgp.next if y == nil { q.first = nil q.last = nil } else { y.prev = nil q.first = y sgp.next = nil // mark as removed (see dequeueSudog) } // if a goroutine was put on this queue because of a // select, there is a small window between the goroutine // being woken up by a different case and it grabbing the // channel locks. Once it has the lock // it removes itself from the queue, so we won't see it after that. // We use a flag in the G struct to tell us when someone // else has won the race to signal this goroutine but the goroutine // hasn't removed itself from the queue yet. if sgp.isSelect { if !atomic.Cas(&sgp.g.selectDone, 0, 1) { continue } } return sgp } }
假如隊首元素就是之前阻塞的 goroutine,那么檢測到其 sgp.isSelect=true,就知道這是一個因為 select chan send、recv 阻塞的 goroutine,然后通過 CAS 操作將 sgp.g.selectDone 設為 true 標識當前 goroutine 的 select 操作已經處理完成,之后就可以將該 goroutine 返回用于從 value buffer 讀或者向 value buffer 寫數據了,或者直接與喚醒它的 goroutine 交換數據,然后該阻塞的 goroutine 就可以恢復執行了。
這里將 sgp.g.selectDone 設為 true,相當于傳達了該 sgp.g 已經從剛才阻塞它的 select-case 塊中退出了,對應的 select-case 塊可以作廢了。有必要提提一下為什么要把這里的 sgp.g.selectDone 設為 true 呢?直接將該 goroutine 出隊不就完了嗎?不行!考慮以下對 chan 的操作 dequeue 是需要先拿到 chan 上的 lock 的,但是在嘗試 lock chan 之前有可能同時有多個 case 分支對應的 chan 準備就緒??磦€示例代碼:
g1 go func() { ch2 <- 1?}() // g2 go func() { ch3 <- 2 } select { case <- ch2: doSomething() case <- ch3: doSomething() }
協程 g1 在 chan.chansend 方法中執行了一般,準備 lock ch2,協程 g2 也執行了一半,也準備 lock ch3; 協程 g1 成功 lock ch2 執行 dequeue 操作,協程 g2 頁成功 lock ch3 執行 deq ueue 操作; 因為同一個 select-case 塊中只能有一個 case 分支允許激活,所以在協程 g 里面加了個成員 g.selectDone 來標識該協程對應的 select-case 是否已經成功執行結束(一個協程在某個時刻只可能有一個 select-case 塊在處理,要么阻塞沒執行完,要么立即執行完),因此 dequeue 時要通過 CAS 操作來更新 g.selectDone 的值,更新成功者完成出隊操作激活 case 分支,CAS 失敗的則認為該 select-case 已經有其他分支被激活,當前 case 分支作廢,select-case 結束。
這里的 CAS 操作也就是說的多個分支滿足條件時,golang 會隨機選擇一個分支執行的道理。
源文件 select.go 中方法 *selectgo(sel hselect) ,實現了對 select-case 塊的處理邏輯,但是由于代碼篇幅較長,這里不再復制粘貼代碼,感興趣的可以自己查看,這里只簡要描述下其執行流程。
selectgo 邏輯處理簡述:
預處理部分 對各個 case 分支按照 ch 地址排序,保證后續按序加鎖,避免產生死鎖問題;
pass 1 部分處理各個 case 分支的判斷邏輯,依次檢查各個 case 分支是否有立即可滿足 ch 讀寫操作的。如果當前分支有則立即執行 ch 讀寫并回,select 處理結束;沒有則繼續處理下一分支;如果所有分支均不滿足繼續執行以下流程。
pass 2 沒有一個 case 分支上 chan 操作立即可就緒,當前 goroutine 需要阻塞,遍歷所有的 case 分支,分別構建 goroutine 對應的 sudog 并 push 到 case 分支對應 chan 的對應 goroutine queue 中。然后 gopark 掛起當前 goroutine,等待某個分支上 chan 操作完成來喚醒當前 goroutine。怎么被喚醒呢?前面提到了 chan.waitq.dequeue() 方法中通過 CAS 將 sudog.g.selectDone 設為 1 之后將該 sudog 返回并恢復執行,其實也就是借助這個操作來喚醒。
pass 3 整個 select-case 塊已經結束使命,之前阻塞的 goroutine 已被喚醒,其他 case 分支沒什么作用了,需要廢棄掉,pass 3 部分會將該 goroutine 從之前阻塞它的 select-case 塊中各 case 分支對應的 chan recv、send goroutine queue 中移除,通過方法 chan.waitq.dequeueSudog(sgp * sudog) 來從隊列中移除,隊列是雙向鏈表,通過 sudog.prev 和 sudog.next 刪除 sudog 時間復雜度為 O(1)。
“simpread golang與select case的實現機制是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。