這篇文章主要介紹“Golang協程池gopool怎么設計與實現”,在日常操作中,相信很多人在Golang協程池gopool怎么設計與實現問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Golang協程池gopool怎么設計與實現”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Goroutine 是 Golang 提供的一種輕量級線程,我們通常稱之為「協程」,相比較線程,創建一個協程的成本是很低的。所以你會經??吹?Golang 開發的應用出現上千個協程并發的場景。
Goroutine 的優勢:
與線程相比,Goroutines 成本很低。
它們的堆棧大小只有幾 kb,堆??梢愿鶕贸绦虻男枰鲩L和縮小,context switch 也很快,而在線程的情況下,堆棧大小必須指定并固定。
Goroutine 被多路復用到更少數量的 OS 線程。
一個包含數千個 Goroutine 的程序中可能只有一個線程。如果該線程中的任何 Goroutine 阻塞等待用戶輸入,則創建另一個 OS 線程并將剩余的 Goroutine 移動到新的 OS 線程。所有這些都由運行時處理,作為開發者無需耗費心力關心,這也使得我們有很干凈的 API 來支持并發。
Goroutines 使用 channel 進行通信。
channel 的設計有效防止了在使用 Goroutine 訪問共享內存時發生競爭條件(race conditions) 。channel 可以被認為是 Goroutine 進行通信的管道。
在高并發場景下,我們可能會啟動大量的協程來處理業務邏輯。協程池是一種利用池化技術,復用對象,減少內存分配的頻率以及協程創建開銷,從而提高協程執行效率的技術。
最近抽空了解了字節官方開源的 gopkg 庫提供的 gopool 協程池實現,感覺還是很高質量的,代碼也非常簡潔清晰,而且 Kitex 底層也在使用 gopool 來管理協程,這里我們梳理一下設計和實現。
了解官方 README 就會發現gopool的用法其實非常簡單,將曾經我們經常使用的 go func(){...} 替換為 gopool.Go(func(){...}) 即可。
此時 gopool 將會使用默認的配置來管理你啟動的協程,你也可以選擇針對業務場景配置池子大小,以及擴容上限。
old:
go func() {
// do your job
}()new:
import (
"github.com/bytedance/gopkg/util/gopool"
)
gopool.Go(func(){
/// do your job
})下面我們來看看gopool是怎樣實現協程池管理的。
Pool 是一個定義了協程池能力的接口。
type Pool interface {
// 池子的名稱
Name() string
// 設置池子內Goroutine的容量
SetCap(cap int32)
// 執行 f 函數
Go(f func())
// 帶 ctx,執行 f 函數
CtxGo(ctx context.Context, f func())
// 設置發生panic時調用的函數
SetPanicHandler(f func(context.Context, interface{}))
}gopool 提供了這個接口的默認實現(即下面即將介紹的pool),當我們直接調用 gopool.CtxGo 時依賴的就是這個。
這樣的設計模式在 Kitex 中也經常出現,所有的依賴均設計為接口,便于隨后擴展,底層提供一個默認的實現暴露出去,這樣對調用方也很友好。
type pool struct {
// 池子名稱
name string
// 池子的容量, 即最大并發工作的 goroutine 的數量
cap int32
// 池子配置
config *Config
// task 鏈表
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// 記錄當前正在運行的 worker 的數量
workerCount int32
// 當 worker 出現panic時被調用
panicHandler func(context.Context, interface{})
}
// NewPool 創建一個新的協程池,初始化名稱,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
p := &pool{
name: name,
cap: cap,
config: config,
}
return p
}調用 NewPool 獲取了以 Pool 的形式返回的 pool 結構體。
type task struct {
ctx context.Context
f func()
next *task
}task 是一個鏈表結構,可以把它理解為一個待執行的任務,它包含了當前節點需要執行的函數f, 以及指向下一個task的指針。
綜合前一節 pool 的定義,我們可以看到,一個協程池 pool 對應了一組task。
pool 維護了指向鏈表的頭尾的兩個指針:taskHead 和 taskTail,以及鏈表的長度taskCount 和對應的鎖 taskLock。
type worker struct {
pool *pool
}一個 worker 就是邏輯上的一個執行器,它唯一對應到一個協程池 pool。當一個worker被喚起,將會開啟一個goroutine ,不斷地從 pool 中的 task鏈表獲取任務并執行。
func (w *worker) run() {
go func() {
for {
// 聲明即將執行的 task
var t *task
// 操作 pool 中的 task 鏈表,加鎖
w.pool.taskLock.Lock()
if w.pool.taskHead != nil {
// 拿到 taskHead 準備執行
t = w.pool.taskHead
// 更新鏈表的 head 以及數量
w.pool.taskHead = w.pool.taskHead.next
atomic.AddInt32(&w.pool.taskCount, -1)
}
// 如果前一步拿到的 taskHead 為空,說明無任務需要執行,清理后返回
if t == nil {
w.close()
w.pool.taskLock.Unlock()
w.Recycle()
return
}
w.pool.taskLock.Unlock()
// 執行任務,針對 panic 會recover,并調用配置的 handler
func() {
defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
logger.CtxErrorf(t.ctx, msg)
if w.pool.panicHandler != nil {
w.pool.panicHandler(t.ctx, r)
}
}
}()
t.f()
}()
t.Recycle()
}
}()
}看到這里,其實就能把整個流程串起來了。我們來看看對外的接口 CtxGo(context.Context, f func()) 到底做了什么?
func Go(f func()) {
CtxGo(context.Background(), f)
}
func CtxGo(ctx context.Context, f func()) {
defaultPool.CtxGo(ctx, f)
}
func (p *pool) CtxGo(ctx context.Context, f func()) {
// 創建一個 task 對象,將 ctx 和待執行的函數賦值
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
// 將 task 插入 pool 的鏈表的尾部,更新鏈表數量
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// 以下兩個條件滿足時,創建新的 worker 并喚起執行:
// 1. task的數量超過了配置的限制
// 2. 當前運行的worker數量小于上限(或無worker運行)
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
// worker數量+1
p.incWorkerCount()
// 創建一個新的worker,并把當前 pool 賦值
w := workerPool.Get().(*worker)
w.pool = p
// 喚起worker執行
w.run()
}
}相信看了代碼注釋,大家就能理解發生了什么。
gopool 會自行維護一個 defaultPool,這是一個默認的 pool 結構體,在引入包的時候就進行初始化。當我們直接調用 gopool.CtxGo() 時,本質上是調用了 defaultPool 的同名方法
func init() {
defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
const (
defaultScalaThreshold = 1
)
// Config is used to config pool.
type Config struct {
// 控制擴容的門檻,一旦待執行的 task 超過此值,且 worker 數量未達到上限,就開始啟動新的 worker
ScaleThreshold int32
}
// NewConfig creates a default Config.
func NewConfig() *Config {
c := &Config{
ScaleThreshold: defaultScalaThreshold,
}
return c
}defaultPool 的名稱為 gopool.DefaultPool,池子容量一萬,擴容下限為 1。
當我們調用 CtxGo時,gopool 就會更新維護的任務鏈表,并且判斷是否需要擴容 worker:
若此時已經有很多 worker 啟動(底層一個 worker 對應一個 goroutine),不需要擴容,就直接返回。
若判斷需要擴容,就創建一個新的worker,并調用 worker.run()方法啟動,各個worker會異步地檢查 pool 里面的任務鏈表是否還有待執行的任務,如果有就執行。
task 是一個待執行的任務節點,同時還包含了指向下一個任務的指針,鏈表結構;
worker 是一個實際執行任務的執行器,它會異步啟動一個 goroutine 執行協程池里面未執行的task;
pool 是一個邏輯上的協程池,對應了一個task鏈表,同時負責維護task狀態的更新,以及在需要的時候創建新的 worker。
其實到這個地方,gopool已經是一個代碼簡潔清晰的協程池庫了,但是性能上顯然有改進空間,所以gopool的作者應用了多次 sync.Pool 來池化對象的創建,復用woker和task對象。
這里建議大家直接看源碼,其實在上面的代碼中已經有所涉及。
task 池化
var taskPool sync.Pool
func init() {
taskPool.New = newTask
}
func newTask() interface{} {
return &task{}
}
func (t *task) Recycle() {
t.zero()
taskPool.Put(t)
}worker 池化
var workerPool sync.Pool
func init() {
workerPool.New = newWorker
}
func newWorker() interface{} {
return &worker{}
}
func (w *worker) Recycle() {
w.zero()
workerPool.Put(w)
}到此,關于“Golang協程池gopool怎么設計與實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。