Redis的List數據類型作為消息隊列,已經比較合適了,但存在一些不足,比如只能獨立消費,訂閱發布又無法支持數據的持久化,相對前兩者,Redis Stream作為消息隊列的使用更為有優勢。
相信球迷小伙伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎么實現的?
具體說就是用什么技術實現最為合適?如何面對數以百萬計的讀壓力?廣告消息是如何插播進來的?最后的歷史消息如何歸檔,如何持久化存儲?
文字直播其實就是解說員作為生產者,生產消息(文字信息),各種客戶端作為消費者,消費信息(刷新文字內容)。
典型的消息隊列實現,可以用隊列或者類似隊列的功能實現,這里只是簡單想象一下,結合redis中的stream數據類型,來學習stream作為消息隊列的功能實現。

1,生成者:生產者隊列的創建,與消息的增刪改
1.1 創建并寫入消息
語法:xadd?queue_name?Id?filed value(filed value)
? ?? 1,每一組消息需要一個唯一的Id,*號表示服務器自動生成ID,后面順序跟著一組或者多組消息(filed value)
? ? ?2,消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,并且是該毫秒內產生的第5條消息。
? ??? ? ?? 消息ID可以由服務器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是后面加入的消息的ID要大于前面的消息ID。
? ? ?3,消息元素的的結構為key-value,必須成對出現,如果key或者value元素中有空格,必須用"abc ?def"或者'abc ?def'括起來

1.2?生產者寫入消息
語法:xadd?queue_name *|Id filed value? ??

1.3 xlen 當前stream的長度:xlen stream_name
xlen?"NBA_Match_001"?,也就是上面寫入的10條消息
1.4 限制某一個stream的最大長度,maxlen?
依據先進先出的原則,自動刪除超出最長長度的消息
xadd?"NBA_Match_001" maxlen 50000 *?"2019-07-13?08:26:39"?"反擊哈騰,一條龍上籃得分"

1.5 查詢消息(查詢是生產者查詢自己生產的消息,跟消費者的消費是兩碼事)
正向查詢
xrange "NBA_Match_001" # 查詢所有消息
xrange "NBA_Match_001" - + ? # -表示最小值, +表示最大值
xrange "NBA_Match_001" 1562980142175-0 + ?# 指定最小消息ID的列表
xrange "NBA_Match_001"- 1562980142175-0 ? # 指定最大消息ID的列表
反向查詢
xrevrange "NBA_Match_001"
xrevrange "NBA_Match_001" + -
xrevrange "NBA_Match_001" + 1562980142175-0
xrevrange "NBA_Match_001" 1562980142175-0 -

1.6 刪除消息
xdel stream_name id,刪除消息并不是真正的物理刪除,隊列的長度不變,指示標記當前消息被刪除

1.7 查看stream屬性xinfo?stream?stream_name?

1.8 del stream_name
刪除 stream :del?NBA_Match_001
刪除本質上本Redis中的其他數據類型一致,stream本身就是一個key值,del key值就刪除了整個消息的全部信息。
2 xread:獨立消費
類似于List,生產者往list中寫數據,消費者從list中讀數據,只能有一個消費者

2. 1,從頭部讀取消息,從某個streams中讀取n條消息,0-0只從頭開始,或者指定從streams的Id開始
xread?count 1 streams?"NBA_Match_001"?0-0
xread?count 1 streams?"NBA_Match_001"?1562980142175-0

2.2,從尾部讀取最新的一條消息
xread?count?1?streams?"NBA_Match_001"?$
此時默認不返回任何消息
xread??block?0?count?1?streams?"NBA_Match_001"?$
以阻塞的方式讀取尾部最新的一條消息,直到新的消息的到來

3 多消費者xgroup?:消費組,每個組中的消費者獨立消費stream中的消息
典型的比如文字直播的安卓App客戶端,蘋果App客戶端,網頁客戶端等等。多個終端,都可以獨立地消費隊列里面的

3.1 創建消費組
對消息隊列"NBA_Match_001"創建了兩個消費組,一個是cg1,一個是cg2,比如網頁客戶端與App客戶端?
1,xgroup?create?"NBA_Match_001"?cg1?0-0??#??表示從頭開始消費
創建消費組cg1,消費組必須綁定一個steam(NBA_Match_001),從頭(0-0?)開始消費"NBA_Match_001"中的消息
2,xgroup?create?"NBA_Match_001"?cg2?0-0??#??表示從頭開始消費
3,2 從消費組中創建消費者
xreadgroup指令可以進行消費組的組內消費
xreadgroup GROUP cg1 c1 count 1 streams"NBA_Match_001"?>
號表示從當前消費組的last_delivered_id后面開始讀 ,?每當消費者讀取一條消息,last_delivered_id變量就會前進?

當一個組的消費則消費完全部消息之后,就沒有新的消息了
每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。
每個消費者者有一個組內唯一名稱。
關于消費組,可能不太好理解,舉個例子就比較清楚
假設有2個消費組cg1,cg2,對于cg1,其組內共有3個消費者c1,、c2、c3。一個消息隊列中共有5條消息a,b,c,d,e,那么一種可能的消費方式如下
a -> c1
b -> c2
c -> c3
d -> c1
e -> c2
也就是說3個消費者,對于消息的消費是互斥的,消費的消息是沒有交集的
而對于cg2,同樣可以消費a,b,c,d,e這5條消息,不依賴于cg1消費組以及消費情況,同理,具體怎么消費,取決于其組內的消費者數量
就好比體育直播的客戶端,正常情況下,網頁客戶端可以收到所有的直播消息,手機App客戶端也可以收到所有的直播消息一樣,不同消費組間對消息的消費互不干擾。
4 多個生產者和多個消費者
這種情況類似以上,不用的是增加了多個消費者,在上面的基礎上做了擴展。
其實不難想象,文字直播插播的廣告消息,可能是類似如下結構,是另外一個獨立的生產者,與文字直播員一樣生成寫入消息到隊列,然后客戶端看到的就是夾雜了廣告的直播。

目前就個人認識而言,stream數據類型實現消息隊列并不完美,最大的問題就是單點壓力問題:這里是說單點壓力,而不是單點故障,stream類型數據,其實從邏輯上看,是一個key值(stream_name),跟著一系列value(消息),這些消息只能存儲在一個Redis實例中,如何緩解多個消費者對單個Key值中的消息消費壓力?說來說去,不就是想說kafka的partition么……
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。