最近部门在做一套告警治理相关的系统,专门用于对整个业务线杂七杂八的告警进行治理管控。
例如Kakfa异常,业务异常,Dubbo超时等等场景,全部都首先会被这套系统给监控到,然后再通过一定的策略分配,将异常内容转换为特定的消息模版告知给特定的业务方。
然后,这里面涉及到一个告警升级的问题需要处理,例如告警消息发送给A员工,但是A员工可能没有看到消息,那么超过一定时限之后,该消息需要升级发送给B员工,以此类推。
![关于Redisson延迟队列的一些思考 图片[1]-关于Redisson延迟队列的一些思考-不念博客](https://www.bunian.cn/wp-content/uploads/2024/01/640-6-15.png)
关于升级这块的链路,我早期一开始想的是设计扫表+定时任务的思路去做,后边发现这套方案数据存储会有冗余,维护成本高,所以打算换个思路去实现–延迟消息。
延迟队列的选型
最早是计划用mq的延迟消息去做的,但是公司内部用的都是Kafka集群,Kafka不支持延迟队列。
如果想用RocketMQ的延迟队列,那么就得搭建一套RocketMQ的环境,运维成本也比较高。
所以后边看了下目前已有的基建设施,打算试试Redis的延迟队列。
![关于Redisson延迟队列的一些思考 图片[2]-关于Redisson延迟队列的一些思考-不念博客](https://www.bunian.cn/wp-content/uploads/2024/01/640-7-12.png)
为什么敢选择Redis做延迟队列?
衡量了一下业务场景的指标,首先异常通知的并发量不会特别多,即使说异常故障瞬间爆发,上游也会有“聚合”(例如100条异常合成1条告知下游)。
所以到达升级这一环节的并发量很小。正应为数据量不高,所以才敢尝试用Redis做延迟队列。
加上目前Redis的基建也比较完善,内存空间足够大,而且对于消息通知的可靠性要求不会说要100%那么高。
Redis可以怎么做延迟队列
很早以前自己也试过基于Redis的Zset去实现一套延迟队列。这里可以利用到一个redis的zset数据结构。
zset结构中基本存储信息和set结构类似,但是会给每个元素都有一个排分的score标示,根据排分的大小会在内存中进行排序操作。
然后由客户端定时扫描ZSet中达到提取时间的数据,这里推荐使用zrangebyscore函数去实现。
zrangebyscore用法扫盲
>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count]分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count] 分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count] 分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。
扫描出来的数据,就是延迟时间截止的数据,从而实现延迟处理的效果。
这套思路是自己早两年的设计方式,然而最近又重新去看了下延迟队列的设计,发现Redisson的部分设计却有些许不同。
Redisson会怎么做延迟队列
首先提供一段简单的Redisson延迟队列的Demo代码给大家学习使用。
maven依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.8</version></dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.8</version> </dependency><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.8</version> </dependency>
延迟队列投递方:
package org.idea.redission.framework.delay.queue;import com.alibaba.fastjson2.JSON;import org.redisson.Redisson;import org.redisson.api.RBlockingQueue;import org.redisson.api.RedissonClient;import org.redisson.config.Config;/*** @Author idea* @Date: Created in 10:21 2024/1/28* @Description*/public class DelayQueueConsumerMain {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd");RedissonClient client = Redisson.create(config);RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue");new Thread(new Runnable() {@Overridepublic void run() {System.out.println("开始拉去延迟消息");while (true) {try {String item = blockingQueue.take();long currentTime = System.currentTimeMillis();MessageModel messageModel = JSON.parseObject(item, MessageModel.class);System.out.println("获取延迟消息,投递时间" + (currentTime - messageModel.getPushTime()) + "ms前,content" + messageModel.getContent());} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}}package org.idea.redission.framework.delay.queue; import com.alibaba.fastjson2.JSON; import org.redisson.Redisson; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; /** * @Author idea * @Date: Created in 10:21 2024/1/28 * @Description */ public class DelayQueueConsumerMain { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd"); RedissonClient client = Redisson.create(config); RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue"); new Thread(new Runnable() { @Override public void run() { System.out.println("开始拉去延迟消息"); while (true) { try { String item = blockingQueue.take(); long currentTime = System.currentTimeMillis(); MessageModel messageModel = JSON.parseObject(item, MessageModel.class); System.out.println("获取延迟消息,投递时间" + (currentTime - messageModel.getPushTime()) + "ms前,content" + messageModel.getContent()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }package org.idea.redission.framework.delay.queue; import com.alibaba.fastjson2.JSON; import org.redisson.Redisson; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; /** * @Author idea * @Date: Created in 10:21 2024/1/28 * @Description */ public class DelayQueueConsumerMain { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd"); RedissonClient client = Redisson.create(config); RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue"); new Thread(new Runnable() { @Override public void run() { System.out.println("开始拉去延迟消息"); while (true) { try { String item = blockingQueue.take(); long currentTime = System.currentTimeMillis(); MessageModel messageModel = JSON.parseObject(item, MessageModel.class); System.out.println("获取延迟消息,投递时间" + (currentTime - messageModel.getPushTime()) + "ms前,content" + messageModel.getContent()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
延迟队列消费方:
package org.idea.redission.framework.delay.queue;import com.alibaba.fastjson2.JSON;import lombok.extern.slf4j.Slf4j;import org.redisson.Redisson;import org.redisson.api.RBlockingQueue;import org.redisson.api.RDelayedQueue;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import java.util.concurrent.TimeUnit;/*** @Author idea* @Date: Created in 10:25 2024/1/28* @Description*/@Slf4jpublic class DelayQueueProducerMain {public static void main(String[] args) throws InterruptedException {Config config = new Config();config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd");RedissonClient client = Redisson.create(config);RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue");RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue);int i = 0;while (true) {i++;TimeUnit.SECONDS.sleep(3);MessageModel messageModel = new MessageModel();messageModel.setContent("test-content-" + i);messageModel.setPushTime(System.currentTimeMillis());delayedQueue.offer(JSON.toJSONString(messageModel), 3, TimeUnit.SECONDS);System.out.println("投递第" + i + "条消息进延迟队列");}}}package org.idea.redission.framework.delay.queue; import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.redisson.Redisson; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; /** * @Author idea * @Date: Created in 10:25 2024/1/28 * @Description */ @Slf4j public class DelayQueueProducerMain { public static void main(String[] args) throws InterruptedException { Config config = new Config(); config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd"); RedissonClient client = Redisson.create(config); RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue"); RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue); int i = 0; while (true) { i++; TimeUnit.SECONDS.sleep(3); MessageModel messageModel = new MessageModel(); messageModel.setContent("test-content-" + i); messageModel.setPushTime(System.currentTimeMillis()); delayedQueue.offer(JSON.toJSONString(messageModel), 3, TimeUnit.SECONDS); System.out.println("投递第" + i + "条消息进延迟队列"); } } }package org.idea.redission.framework.delay.queue; import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.redisson.Redisson; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; /** * @Author idea * @Date: Created in 10:25 2024/1/28 * @Description */ @Slf4j public class DelayQueueProducerMain { public static void main(String[] args) throws InterruptedException { Config config = new Config(); config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd"); RedissonClient client = Redisson.create(config); RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue"); RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue); int i = 0; while (true) { i++; TimeUnit.SECONDS.sleep(3); MessageModel messageModel = new MessageModel(); messageModel.setContent("test-content-" + i); messageModel.setPushTime(System.currentTimeMillis()); delayedQueue.offer(JSON.toJSONString(messageModel), 3, TimeUnit.SECONDS); System.out.println("投递第" + i + "条消息进延迟队列"); } } }
将上述代码运行起来,一端投递数据,一端消费数据,然后连接上Redis,你会发现有两个队列存在:
![关于Redisson延迟队列的一些思考 图片[3]-关于Redisson延迟队列的一些思考-不念博客](https://www.bunian.cn/wp-content/uploads/2024/01/640-8-12.png)
这两个队列中,带有timeout关键字的那条是一个ZSet集合,另一个是普通的List。
但是为什么要设计两条队列呢,后边我查阅了相关资料,大概整理了下Redisson的延迟队列原理如下:
- 客户端启动,redisson先订阅一个key,同时 BLPOP key 0 无限监听一个阻塞队列(等里面有数据了就返回)。
- 当有数据put时,redisson先把数据放到一个zset集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的key,发布内容为数据到期的timeout,此时客户端进程开启一个延时任务,延时时间为发布的timeout。
- 客户端进程的延时任务到了时间执行,从zset分页取出过了当前时间的数据,然后将数据rpush到第一步的阻塞队列里。然后将当前数据从zset移除,取完之后,又执行 BLPOP key 0 无限监听一个阻塞队列。这一部分的逻辑,客户端会发送一个lua脚本给到服务端去操作:org.redisson.RedissonDelayedQueue源码里面的pushTaskAsync函数有lua脚本内容。
- 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take方法。于是,我们就收到了数据。
ps:可能有些同学对Redis的发布订阅不是很了解,你大概可以理解为是Redis做的一种消息广播机制。客户端订阅了A渠道之后,往A渠道发送内容,此时所有的订阅方立即可以收到消息内容。关于订阅发布的原理,可以看看这篇文章:https://cloud.tencent.com/developer/article/2297090
利用了Redis的订阅发布,确实可以减少客户端长时间轮询ZSet带来的网络性能开销,但需要依靠客户端的延迟任务,隔一段时间再去拉去zset。
Redisson为啥要多搞一个List出来
但是为什么从ZSet中取到消息之后,还得放入一个List队列中,然后再利用bLPop去获取元素呢?
这里说下我自己的一些思考:
Redisson的延迟消息设计的初衷,只是提供了一个到时间弹出到特定队列的功能,但是至于这条消息队列你要如何操作,反而是留给了开发者自己去思考。(例如消息的重试方面)
所以如果要用Redisson去做延迟消息,而且你自己也希望能具备一些重试机制的话,那么这个List是可以去自由发挥的。
而如果只是单纯的到时间了,直接从ZSet中取出来就完事的话,那么可二次发挥的空间就会少了些许。