這篇“Redis怎么實現延遲隊列”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Redis怎么實現延遲隊列”文章吧。
Redis 是通過有序集合(ZSet)的方式來實現延遲消息隊列的,ZSet 有一個 Score 屬性可以用來存儲延遲執行的時間。
但需要無限循環檢查任務,會消耗系統資源
class RedisDelayQueue(object):
"""Simple Queue with Redis Backend
dq = RedisDelayQueue('delay:commtrans')
dq.put( 5 ,{'info':'測試 5555','time': timestamp_to_datetime_str(t + 5)})
print(dq.get())
"""
def __init__(self, name, namespace='queue'):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = get_redis_engine(database_name='spdb')
self.key = '%s:%s' % (namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.zcard(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def rem(self, value):
return self.__db.zrem(self.key, value)
def get(self):
# 獲取任務,以0和當前時間為區間,返回一條在當前區間的記錄
items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
if items:
item = items[0]
if self.rem(item): # 解決并發問題 如能刪就讓誰取走
return json.loads(item)
return None
def put(self, interval, item):
""":param interval 延時秒數"""
# 以時間作為score,對任務隊列按時間戳從小到大排序
"""Put item into the queue."""
d = json.dumps(item)
return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})近期在開發部門的新項目,其中有個關鍵功能就是智能推送,即根據用戶行為在特定的時間點向用戶推送相應的提醒消息,比如以下業務場景:
在用戶點擊充值項后,半小時內未充值,向用戶推送充值未完成提醒。
在用戶最近一次閱讀行為2小時后,向用戶推送繼續閱讀提醒。
在用戶新注冊或退出應用N分鐘后,向用戶推送合適的推薦消息。
…
上述場景的共同特征就是在某事件觸發后延遲一定時間后再執行特定任務,若事件觸發時間點可知,則上述邏輯也可等價于在指定時間點(事件觸發時間點+延遲時間長度)執行特定任務。
實現這類需求一般采用延時隊列,其中創建的延時消息中需要包含任務延遲時間或任務執行時間點等信息,當任務滿足時間條件需要執行時,該消息便會被消費,也就是說可以指定隊列中的消息在哪個時間點被消費。
在單機環境中,JDK已經自帶了很多能夠實現延時隊列功能的組件,比如DelayQueue, Timer, ScheduledExecutorService等組件,都可以較為簡便地創建延時任務,但上述組件使用一般需要把任務存儲在內存中,服務重啟存在任務丟失風險,且任務規模體量受內存限制,同時也造成長時間內存占用,并不靈活,通常適用于單進程客服端程序中或對任務要求不高的項目中。
在分布式環境下,僅使用JDK自帶組件并不能可靠高效地實現延時隊列,通常需要引入第三方中間件或框架。
比如常見的經典任務調度框架Quartz或基于此框架的xxl-job等其它框架,這些框架的主要功能是實現定時任務或周期性任務,在Redis、RabbitMQ還未廣泛應用時,譬如常見的超時未支付取消訂單等功能都是由定時任務實現的,通過定時輪詢來判斷是否已到達觸發執行的時間點。
但由于定時任務需要一定的周期性,周期掃描的間隔時間不好控制,太短會造成很多無意義的掃描,且增大系統壓力,太長又會造成執行時間誤差太大,且可能造成單次掃描所處理的堆積記錄數量過大。
此外,利用MQ做延時隊列也是一種常見的方式,比如通過RabbitMQ的TTL和死信隊列實現消息的延遲投遞,考慮到投遞出去的MQ消息無法方便地實現刪除或修改,即無法實現任務的取消或任務執行時間點的更改,同時也不能方便地對消息進行去重,因此在項目中并未選擇使用MQ實現延時隊列。
Redis的數據結構zset,同樣可以實現延遲隊列的效果,且更加靈活,可以實現MQ無法做到的一些特性,因此項目最終采用Redis實現延時隊列,并對其進行優化與封裝。
實現原理是利用zset的score屬性,redis會將zset集合中的元素按照score進行從小到大排序,通過zadd命令向zset中添加元素,如下述命令所示,其中value值為延時任務消息,可根據業務定義消息格式,score值為任務執行的時間點,比如13位毫秒時間戳。
zadd delayqueue 1614608094000 taskinfo
任務添加后,獲取任務的邏輯只需從zset中篩選score值小于當前時間戳的元素,所得結果便是當前時間節點下需要執行的任務,通過zrangebyscore命令來獲取,如下述命令所示,其中timestamp為當前時間戳,可用limit限制每次拉取的記錄數,防止單次獲取記錄數過大。
zrangebyscore delayqueue 0 timestamp limit 0 1000
在實際實現過程中,從zset中獲取到當前需要執行的任務后,需要先確保將任務對應的元素從zset中刪除,刪除成功后才允許執行任務邏輯,這樣是為了在分布式環境下,當存在多個線程獲取到同一任務后,利用redis刪除操作的原子性,確保只有一個線程能夠刪除成功并執行任務,防止重復執行。
實際任務的執行通常會再將其發送至MQ異步處理,將“獲取任務”與“執行任務”兩者分離解耦,更加靈活,“獲取任務”只負責拿到當前時間需要執行的任務,并不真正運行任務業務邏輯,因此只需相對少量的執行線程即可,而實際的任務執行邏輯則由MQ消費者承擔,方便調控負載能力。
整體過程如下圖所示。

采用zset做延時隊列的另一個好處是可以實現任務的取消和任務執行時間點的更改,只需要將任務信息從zset中刪除,便可取消任務,同時由于zset擁有集合去重的特性,只需再次寫入同一個任務信息,但是value值設置為不同的執行時間點,便可更改任務執行時間,實現單個任務執行時間的動態調整。
了解實現原理后,再進行具體編程實現。創建延時任務較為簡便,準備好任務消息和執行時間點,寫入zset即可。獲取延時任務最簡單的方案是通過定時任務,周期性地執行上述邏輯,如下代碼所示。
@XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue")
public void scanBusiness1() {
// 某業務邏輯的zset延遲隊列對應的key
String zsetKey = "delayqueue:business1";
while (true) {
// 篩選score值小于當前時間戳的元素,一次最多拉取1000條
Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000);
if (CollectionUtils.isEmpty(tasks)) {
// 當前時間下已沒有需要執行的任務,結束本次掃描
return;
}
for (String task : tasks) {
// 先刪除,再執行,確保多線程環境下執行的唯一性
Boolean delete = stringRedisTemplate.delete(task);
if (delete) {
// 刪除成功后,將其再發送到指定MQ異步處理,將“獲取任務”與“執行任務”分離解耦
rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task);
}
}
}
}上述方案使用xxl-job做分布式定時任務,間隔5秒執行一次,代碼借助spring提供的api來完成redis和MQ的操作。
由于是分布式定時任務,每次執行只有一個線程在獲取任務,機器利用率低,當數據規模較大時,單靠一個線程無法滿足吞吐量要求,因此這種方案只適用于小規模數據量級別。
此處間隔時間也可適當調整,例如縮短為1秒,調整所需考慮原則在上文已提到:間隔太短會造成很多無意義的掃描,且增大系統壓力,太長又會造成執行時間誤差太大。
為了提升整體吞吐量,考慮不使用分布式定時任務,對集群內每臺機器(或實例)均設置獨立的定時任務,同時采用多個zset隊列,以數字后綴區分。
假設有M個zset隊列,創建延時消息時選取消息的某個ID字段,計算hash值再對M取余,根據余數決定發送到對應數字后綴的zset隊列中(分散消息,此處ID字段選取需要考慮做到均勻分布,不要造成數據傾斜)。
隊列數量M的選取需要考慮機器數量N,理想情況下有多少臺機器就定義多少個隊列,保持M與N基本相等即可。
因為隊列太少,會造成機器對隊列的競爭訪問處理,隊列太多又會導致任務得不到及時的處理。
最佳實踐是隊列數量可動態配置,如采用分布式配置中心,這樣當集群機器數量變化時,可以相應調整隊列數量。
每臺機器在觸發定時任務時,需要通過適當的負載均衡來決定從哪個隊列拉取消息,負載均衡的好壞也會影響整個集群的效率,如果負載分布不均可能會導致多臺機器競爭處理同一隊列,降低效率。
一個簡單實用的做法是利用redis的自增操作再對隊列數量取余即可,只要保持隊列數量和機器數量基本相等,這種做法在很大程度上就可以保證不會有多臺機器競爭同一隊列。
至于每臺機器從對應zset中的任務獲取邏輯,仍然和前面代碼一致。以上方式簡化實現代碼如下所示。
@Scheduled(cron = "0/5 * * * * ?")
public void scanBusiness1() {
// 隊列數量M,考慮動態配置,保持和機器數量基本一致
int M = 10;
// redis自增key,用于負載均衡
String incrKey = "incrkey:delayqueue:business1";
// 每臺機器執行時,從不同的zset中拉取消息,盡量確保不同機器訪問不同zset
String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M);
while (true) {
// 此處邏輯和前面代碼一致,省略。。。
}
}上述方案和第一種方案的主要的不同點在于zsetKey的獲取上,這里是根據負載均衡算法算出來的,確保每臺機器訪問不同zset并拉取消息,同時定時任務采用spring提供的進程內注解@Scheduled,集群內每臺機器都會間隔5秒執行,因此相比之前的方案,能夠較為明顯地提升整個集群的吞吐量。
但是這種方案的步驟相對更為復雜,需要動態配置隊列數量,同時在創建延時任務時需要選擇合適的消息ID字段來決定發送的目標zset隊列,此處還要考慮均勻分布,整體實現要考慮的因素較多。
上面一種方案已經能夠較好地滿足整體吞吐量要求,但其缺點是步驟相對復雜,因此項目中沒有采用這種方案,而是采用下面一種也能滿足吞吐量要求,步驟相對簡單,又方便通用化的方案。
該方案不使用定時任務,而是單獨啟動后臺線程,在線程中執行永久循環,每次循環邏輯為:從目標zset中獲取score值小于當前時間戳的元素集合中的score最小的那個元素,相當于獲取當前時間點需要執行且執行時間點最早的那個任務,如果獲取不到,表示當前時間點下暫無需要執行的任務,則線程休眠100ms(可視情況調整),否則,對獲取到的元素進行處理,在分布式多線程環境下,仍然需要先刪除成功才能進行處理。
此外,考慮到每個線程獲取元素后都需要再次訪問redis嘗試刪除操作,為了避免多線程爭搶浪費資源,降低效率,這里采用lua腳本將獲取和刪除操作原子化。lua腳本邏輯代碼如下所示。
local zsetKey = 'delayqueue'
local timestamp = 1614608094000
local items = redis.call('zrangebyscore',zsetKey,0,timestamp,'limit',0,1)
if #items == 0 then
return ''
else
redis.call('zremrangebyrank',zsetKey,0,0)
return items[1]
end其中timestamp為當前時間戳,通過在zrangebyscore命令中指定limit為1來獲取score最小的元素,若獲取不到,即結果集長度為0,則返回空字符串,否則,通過zremrangebyrank命令刪除頭部元素,即score最小的元素,也就是之前獲取到的那個元素,由于redis內部保證lua腳本的原子性,上述獲取并刪除的操作能夠運行無誤。具體JAVA實現中還對其進行了多線程操作的封裝和通用化的抽象,使不同業務都能夠使用該組件實現延時隊列。具體實現代碼如下所示。
/**
* 基于ZSET實現消息延遲處理,score存儲執行時間點,到達時間點即會向指定隊列發送該消息;
* 定義一個繼承本類的bean即可;
*/
public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean {
private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT;
static {
// 獲取并刪除的lua腳本,使用spring提供的api
String sb = "local items = redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'limit',0,1)\n" +
"if #items == 0 then\n" +
"\treturn ''\n" +
"else\n" +
"\tredis.call('zremrangebyrank',KEYS[1],0,0)\n" +
"\treturn items[1]\n" +
"end";
// 自有工具類,只要能創建出spring包下的 RedisScript 的實現類對象均可
TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class);
}
private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(),
0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix()));
private volatile boolean quit = false;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void startScan() {
// bean構建完成后,啟動若干執行線程
int threadNum = getThreadNum();
for (int i = 0; i < threadNum; i++) {
EXECUTOR.execute(this);
}
}
@Override
public void run() {
while (!quit) {
try {
// 循環,采用lua獲取當前需要執行的任務并將其從redis中刪除
String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT,
Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis()));
if (StringUtils.isNotBlank(msg)) {
// 消息不為空,表示獲取任務成功,將其再發送到指定MQ異步處理,將“獲取任務”與“執行任務”分離解耦
rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg);
} else {
// 獲取不到任務,表示當前時間點下暫無需要執行的任務,則線程休眠1S(可視情況調整)
SleepUtils.sleepSeconds(1);
}
} catch (Exception e) {
Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e);
}
}
}
@Override
public void destroy() throws Exception {
quit = true;
}
public void setQuit(boolean quit) {
this.quit = quit;
}
/**
* 獲取消息的工作線程數量
*/
protected abstract int getThreadNum();
/**
* 線程名稱前綴,方便問題定位
*/
protected abstract String getThreadNamePrefix();
/**
* 存放延遲消息的ZSET隊列名
*/
protected abstract String getDelayedMsgSourceKey();
/**
* 消息到達執行時間點時將其通過指定 exchange 發送到實時消費隊列中
*/
protected abstract String getSendExchange();
/**
* 消息到達執行時間點時將其通過指定 routingKey 發送到實時消費隊列中
*/
protected abstract String getSendRoutingKey();
}在具體業務應用中,只需定義一個繼承上述類的bean即可,需要實現的方法主要是提供一些配置,比如該業務對應的zset延時隊列名稱,同時工作拉取消息的線程數量,由于采用rabbitMq,因此這里需要提供exchange和routingKey。
實際使用中只需向該zset隊列中添加消息,并將score設為該任務需要執行的時間點(此處為13位毫秒時間戳),則到該時間點后,上述組件便會將該消息從zset中取出并刪除,再將其通過指定的路由發送到實時MQ消費隊列中,由消費者負責執行任務業務邏輯。目前該組件在項目中正常平穩運行。
注意:
本文結合項目中的實際需求介紹了延時隊列的應用場景,分析了延時隊列的多種實現,重點講述了利用redis實現延時隊列的原理,對其實現方案進行比較與優化,并將最終方案實際運用于項目需求中。
以上就是關于“Redis怎么實現延遲隊列”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。