溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么在golang中實現一個redis延時消息隊列功能

發布時間:2021-04-12 17:58:57 來源:億速云 閱讀:772 作者:Leah 欄目:編程語言

怎么在golang中實現一個redis延時消息隊列功能?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

提前準備 安裝redis, redis-go

因為用的是macOS, 直接

$ brew install redis
$ go get github.com/garyburd/redigo/redis

又因為比較懶,生成任務的唯一id時,直接采用了bson中的objectId,所以:

$ go get gopkg.in/mgo.v2/bson

唯一id不是必須有,但如果之后有實際應用需要攜帶,便于查找相應任務。

生產者

通過一個for循環生成10w個任務, 每一個任務有不同的時間

func producer() {
 count := 0
 //生成100000個任務
 for count < 100000 {
 count++
 dealTime := int64(rand.Intn(5)) + time.Now().Unix()
 uuid := bson.NewObjectId().Hex()
 redis.Client.AddJob(&job.JobMessage{
 Id: uuid,
 DealTime: dealTime,
 }, + int64(dealTime))
 }
}

其中AddJob函數在另一個包中, 將上一個函數中隨機生成的時間作為需要處理的時間戳.

// 添加任務
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}

消費者

消費者處理流程分為兩個步驟:

  • 獲取小于等于當前時間戳的任務

  • 通過刪除當前任務來判斷誰獲得了當前任務

因為在獲取小于等于當前時間戳的任務時,可能有多個go routine同時讀到了當前任務,而只有一個任務可以來處理當前任務。因此我們需要通過一個方案來判斷究竟由誰來處理這個任務(當然如果只有一個消費者可以讀到就直接處理):這個時候可以通過redis的刪除操作來獲取,因為刪除指定value時只有成功的操作才會返回不為0,所以我們可以認為刪除當前隊列成功的那個go routine拿到了當前的任務。

下面是代碼:

// 消費者
func consumer() {
 // 啟動10個go routine一起去拿
 count := 0
 for count < 10 {
 go func() {
 for {
 jobs := redis.Client.GetJob()
 if len(jobs) <= 0 {
  time.Sleep(time.Second * 1)
  continue
 }
 currentJob := jobs[0]
 // 如果當前搶redis隊列成功,
 if redis.Client.DelJob(currentJob) > 0 {
  var jobMessage job.JobMessage
  util.JsonDecode(currentJob, &jobMessage) //自定義的json解析函數
  handleMessage(&jobMessage)
 }

 }

 }()
 count++
 }
}

// 處理任務用函數
func handleMessage(msg *job.JobMessage) {
 fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
 go func() {
 countChan <- true
 }()
}

redis部分的代碼,獲取任務和刪除任務

// 獲取任務
func (client *RedisClient) GetJob() []string {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 timeNow := time.Now().Unix()
 ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
 if err != nil {
 panic(err)
 }
 return ret
}

// 刪除當前任務, 用來判斷是否搶到了當前任務
func (client *RedisClient) DelJob(value string) int {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 ret, err := redis.Int(conn.Do("zrem", key, value))
 if err != nil {
 panic(err)
 }
 return ret
}

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女