最近部门在做一套告警治理相关的系统,专门用于对整个业务线杂七杂八的告警进行治理管控。
例如Kakfa异常,业务异常,Dubbo超时等等场景,全部都首先会被这套系统给监控到,然后再通过一定的策略分配,将异常内容转换为特定的消息模版告知给特定的业务方。
然后,这里面涉及到一个告警升级的问题需要处理,例如告警消息发送给A员工,但是A员工可能没有看到消息,那么超过一定时限之后,该消息需要升级发送给B员工,以此类推。
关于升级这块的链路,我早期一开始想的是设计扫表+定时任务的思路去做,后边发现这套方案数据存储会有冗余,维护成本高,所以打算换个思路去实现–延迟消息。
延迟队列的选型
最早是计划用mq的延迟消息去做的,但是公司内部用的都是Kafka集群,Kafka不支持延迟队列。
如果想用RocketMQ的延迟队列,那么就得搭建一套RocketMQ的环境,运维成本也比较高。
所以后边看了下目前已有的基建设施,打算试试Redis的延迟队列。
为什么敢选择Redis做延迟队列?
衡量了一下业务场景的指标,首先异常通知的并发量不会特别多,即使说异常故障瞬间爆发,上游也会有“聚合”(例如100条异常合成1条告知下游)。
所以到达升级这一环节的并发量很小。正应为数据量不高,所以才敢尝试用Redis做延迟队列。
加上目前Redis的基建也比较完善,内存空间足够大,而且对于消息通知的可靠性要求不会说要100%那么高。
Redis可以怎么做延迟队列
很早以前自己也试过基于Redis的Zset去实现一套延迟队列。这里可以利用到一个redis的zset数据结构。
zset结构中基本存储信息和set结构类似,但是会给每个元素都有一个排分的score标示,根据排分的大小会在内存中进行排序操作。
然后由客户端定时扫描ZSet中达到提取时间的数据,这里推荐使用zrangebyscore函数去实现。
zrangebyscore用法扫盲
>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count]
分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。
扫描出来的数据,就是延迟时间截止的数据,从而实现延迟处理的效果。
这套思路是自己早两年的设计方式,然而最近又重新去看了下延迟队列的设计,发现Redisson的部分设计却有些许不同。
Redisson会怎么做延迟队列
首先提供一段简单的Redisson延迟队列的Demo代码给大家学习使用。
maven依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.8</version>
</dependency>
延迟队列投递方:
package org.idea.redission.framework.delay.queue;
import com.alibaba.fastjson2.JSON;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
/**
* @Author idea
* @Date: Created in 10:21 2024/1/28
* @Description
*/
public class DelayQueueConsumerMain {
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd");
RedissonClient client = Redisson.create(config);
RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("开始拉去延迟消息");
while (true) {
try {
String item = blockingQueue.take();
long currentTime = System.currentTimeMillis();
MessageModel messageModel = JSON.parseObject(item, MessageModel.class);
System.out.println("获取延迟消息,投递时间" + (currentTime - messageModel.getPushTime()) + "ms前,content" + messageModel.getContent());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
延迟队列消费方:
package org.idea.redission.framework.delay.queue;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
/**
* @Author idea
* @Date: Created in 10:25 2024/1/28
* @Description
*/
@Slf4j
public class DelayQueueProducerMain {
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://cloud.db:8801").setPassword("pwd");
RedissonClient client = Redisson.create(config);
RBlockingQueue<String> blockingQueue = client.getBlockingQueue("delay_queue");
RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue);
int i = 0;
while (true) {
i++;
TimeUnit.SECONDS.sleep(3);
MessageModel messageModel = new MessageModel();
messageModel.setContent("test-content-" + i);
messageModel.setPushTime(System.currentTimeMillis());
delayedQueue.offer(JSON.toJSONString(messageModel), 3, TimeUnit.SECONDS);
System.out.println("投递第" + i + "条消息进延迟队列");
}
}
}
将上述代码运行起来,一端投递数据,一端消费数据,然后连接上Redis,你会发现有两个队列存在:
这两个队列中,带有timeout关键字的那条是一个ZSet集合,另一个是普通的List。
但是为什么要设计两条队列呢,后边我查阅了相关资料,大概整理了下Redisson的延迟队列原理如下:
- 客户端启动,redisson先订阅一个key,同时 BLPOP key 0 无限监听一个阻塞队列(等里面有数据了就返回)。
- 当有数据put时,redisson先把数据放到一个zset集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的key,发布内容为数据到期的timeout,此时客户端进程开启一个延时任务,延时时间为发布的timeout。
- 客户端进程的延时任务到了时间执行,从zset分页取出过了当前时间的数据,然后将数据rpush到第一步的阻塞队列里。然后将当前数据从zset移除,取完之后,又执行 BLPOP key 0 无限监听一个阻塞队列。这一部分的逻辑,客户端会发送一个lua脚本给到服务端去操作:org.redisson.RedissonDelayedQueue源码里面的pushTaskAsync函数有lua脚本内容。
- 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take方法。于是,我们就收到了数据。
ps:可能有些同学对Redis的发布订阅不是很了解,你大概可以理解为是Redis做的一种消息广播机制。客户端订阅了A渠道之后,往A渠道发送内容,此时所有的订阅方立即可以收到消息内容。关于订阅发布的原理,可以看看这篇文章:https://cloud.tencent.com/developer/article/2297090
利用了Redis的订阅发布,确实可以减少客户端长时间轮询ZSet带来的网络性能开销,但需要依靠客户端的延迟任务,隔一段时间再去拉去zset。
Redisson为啥要多搞一个List出来
但是为什么从ZSet中取到消息之后,还得放入一个List队列中,然后再利用bLPop去获取元素呢?
这里说下我自己的一些思考:
Redisson的延迟消息设计的初衷,只是提供了一个到时间弹出到特定队列的功能,但是至于这条消息队列你要如何操作,反而是留给了开发者自己去思考。(例如消息的重试方面)
所以如果要用Redisson去做延迟消息,而且你自己也希望能具备一些重试机制的话,那么这个List是可以去自由发挥的。
而如果只是单纯的到时间了,直接从ZSet中取出来就完事的话,那么可二次发挥的空间就会少了些许。