基于Redis实现的延迟队列
1. 适用场景
日常开发中,我们经常遇到这样的需求,在某个事件发生后,过一段时间做一个额外的动作,比如
- 拼单,如果2小时未能成单,取消拼单
- 下单,30分钟内未支付,取消订单
之前的我们的做法通常是通过定时任务轮询,比如扫描创建时间是2小时之前,状态是未成功的拼单,然后做取消操作。这种方案存在的问题是: - 扫描对数据库造成一定的压力
- 轮询的时间间隔会导致操作有一定的延迟
延迟消息正是用来解决这类问题的银弹。
2. JDK实现
2.1 使用方式
JDK内部提供了DelayQueue
队列和Delayed
接口来实现延迟消息,我们先来看一个简单的Demo,我们会创建一个DelayMessage用来代表延迟消息,延迟消息需要实现Delayed接口
- getDealy,返回消息的延迟时间
- compareTo,为了让多个延迟消息排序,将时间最早的消息排到最前面
public class DelayMessage implements Delayed {
private long expiredAtMs;
private long delayMs;
private String message;
public DelayMessage(long delaySeconds, String message) {
this.delayMs = delaySeconds * 1000;
this.expiredAtMs = System.currentTimeMillis() + delayMs;
this.message = message;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = expiredAtMs - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
long sTtl = getDelay(TimeUnit.MILLISECONDS);
long oTtl = o.getDelay(TimeUnit.MILLISECONDS);
return sTtl < oTtl ? -1 : (sTtl > oTtl ? 1 : 0);
}
public String getMessage() {
return this.message;
}
}
接着只需要创建消息队列,将延迟消息放入到队列中即可,然后创建一个线程来消费延迟队列即可
DelayQueue<DelayMessage> queue = new DelayQueue<>();
queue.put(new DelayMessage(1, "1s later"));
queue.put(new DelayMessage(60, "60s later"));
queue.put(new DelayMessage(120, "120s later"));
ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(() -> {
try {
while (true) {
DelayMessage dm = queue.take();
System.out.println(currentTimeInText() + "_" + dm.getMessage());
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
2.2 实现原理
从DelayQueue的源码我们可以看到,整个DelayQueue的核心就在于3个点:
- 数据存储,基于PriorityQueue,通过Delayed的compareTo方法排序,即基于时间顺序
- 数据写入,offer/put方法
- 数据消费,take/poll方法
1. 数据写入
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // PriorityQueue写入
if (q.peek() == e) { // 如果刚刚写入的消息是最高优先级的(最早被消费的),唤醒在take()方法阻塞的线程
leader = null; // Leader-Follow Parttern,减少RaceCondition, http://www.cs.wustl.edu/~schmidt/POSA/POSA2/
available.signal(); // 唤醒在take()阻塞的线程
}
return true;
} finally {
lock.unlock();
}
}
2. 数据消费
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await(); // 队列为空,阻塞,直到offer(e)被调用
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 延迟时间到了,取出item供使用
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await(); // await释放锁,其他线程执行take(),如果leader != null有负责处理头部item的线程
else {
Thread thisThread = Thread.currentThread(); // 走到这说明头部元素暂无处理线程,将当前线程设定为处理线程
leader = thisThread;
try {
available.awaitNanos(delay); // 等待延迟时间后自动唤醒,重新进入循环,处理queue头部item
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
代码很短,设计还是巧妙的,尤其是Leader-Follower模式的使用,在我们实现自己的组件时可以借鉴。
3. Redis实现
JDK实现的延迟队列已经能解决部分场景了,不过也存在两个明显的问题
- 队列数据没持久化,重启或进程崩溃都会导致数据丢失
- 不支持分布式,不能跨进程共享
3.1 消息队列
通过上面的JDK实现,我们已经能把Redis实现的延迟消息的逻辑猜的八九不离十了,假设我们用LIST存储,先通过LPUSH写入队列消息(message1、message2)
127.0.0.1:6379> LPUSH my_delay_queue message1
(integer) 1
127.0.0.1:6379> LPUSH my_delay_queue message2
(integer) 2
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
2) "message1"
通过RPOPLPUSH,从队列取出待消费的消息,并暂存到临时队列(my_delay_queue)中
127.0.0.1:6379> RPOPLPUSH my_delay_queue my_delay_queue_temp
"message1"
127.0.0.1:6379> LRANGE my_delay_queue_temp 0 -1
1) "message1"
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
这是在程序代码中消费message1,如果消费成功,从临时队列中删除消息
127.0.0.1:6379> LREM my_delay_queue_temp 1 message1
(integer) 1
最终队列的状态是,delayQueue中只剩message2,临时队列中为空
127.0.0.1:6379> LRANGE my_delay_queue_temp 0 -1
(empty array)
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
3.2 延迟队列
用LIST只能实现FIFO,要想实现基于时间的优先级,需要改用ZSET来存储数据,用时间做时间戳
127.0.0.1:6379> ZADD s_delay_queue 1728625236 message0
127.0.0.1:6379> ZADD s_delay_queue 1728625256 message0
127.0.0.1:6379> ZADD s_delay_queue 1728625256 message2
127.0.0.1:6379> ZADD s_delay_queue 1728625266 message3
127.0.0.1:6379> ZRANGE s_delay_queue 0 -1 WITHSCORES
1) "message0"
2) "1728625236"
3) "message1"
4) "1728625256"
5) "message2"
6) "1728625256"
7) "message3"
8) "1728625266"
通过使用ZRANGEBYSCORE获取延迟时间已经到的item
127.0.0.1:6379> ZRANGEBYSCORE s_delay_queue 0 1728625256
1) "message0"
2) "message1"
3) "message2"
ZSET并没有提供RPOPLPUSH的命令,我们使用Lua脚本来模拟这个操作,这段lua接受两个KEY,一个ARGV
- KEYS[1],表示ZSET的名字
- KEYS[2],表示LIST的名字
- ARGV[1],表示SCORE的范围截至时间
local elements = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])
if #elements > 0 then
for i, element in ipairs(elements) do
redis.call('LPUSH', KEYS[2], element)
redis.call('ZREM', KEYS[1], element)
end
end
return elements
然后是通过EVAL执行这段Lua,这里我们从ZSET(s_delay_queue)读取score <= 1728625237的item,返回并暂存到LIST(s_delay_queue_temp)中,模拟了RPOPLPUSH的操作
127.0.0.1:6379> EVAL "local elements = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1]) if #elements > 0 then for i, element in ipairs(elements) do redis.call('LPUSH', KEYS[2], element) redis.call('ZREM', KEYS[1], element) end end return elements" 2 s_delay_queue s_delay_queue_temp 1728625237
1) "message0"
剩下的逻辑基本上和[[基于Redis的延迟队列#3.1 消息队列]]一样,在程序中消费message,成功之后删除s_delay_queue_temp中的数据。我们需要做的是在程序中定时的执行这段Lua脚本,并且实现类似DelayQueue的逻辑,支持阻塞的take()操作,以及消费失败时的错误处理,显然要处理的错误细节并不少。
3.3 Redisson实现
1. 数据结构
Redisson封装了基于Redis的延迟消息实现,我们来看一个使用的Redisson延迟队列的demo
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayBlockingQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
delayedQueue.offer("message1", 1, TimeUnit.MINUTES);
delayedQueue.offer("message2", 5, TimeUnit.MINUTES);
delayedQueue.offer("message3", 10, TimeUnit.MINUTES);
delayedQueue.offer("message4", 15, TimeUnit.MINUTES);
ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(() -> {
while (true) {
String data = blockingQueue.poll(60, TimeUnit.SECONDS);
if (data != null) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":" + data);
}
}
});
Redisson的实现比[[#3.2 延迟队列]]要负责一点,它内部构建了4个数据结构。通过Redis的命令查看,我们能看到3个KEY
127.0.0.1:6379> KEYS *
2) "delayBlockingQueue"
4) "redisson_delay_queue:{delayBlockingQueue}"
6) "redisson_delay_queue_timeout:{delayBlockingQueue}"
delayBlockingQueue
是我们创建RBlockingQueue时指定的名称,用来存储延迟时间到期,但尚未被处理的任务redisson_delay_queue_timeout:{delayBlockingQueue}
,类型是zset,记录延迟任务和时间redisson_delay_queue:{delayBlockingQueue}
,类型是list,记录任务列表,保持任务的顺序
通过TYPE命令,我们能查看他们的数据类型
127.0.0.1:6379> TYPE redisson_delay_queue:{delayBlockingQueue}
list
127.0.0.1:6379> TYPE redisson_delay_queue_timeout:{delayBlockingQueue}
zset
此外Redission还创建了一个Channel,用来在delayQueue写入数据的时候做通知
127.0.0.1:6379> PUBSUB channels
1) "redisson_delay_queue_channel:{delayBlockingQueue}"
2. 数据写入
通过RDelayedQueue写入数据的时候,最终会调用offerAsync方法
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);" // 写入 redisson_delay_queue_timeout:{delayBlockingQueue}
+ "redis.call('rpush', KEYS[3], value);" // 写入 redisson_delay_queue:{delayBlockingQueue}
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[2], 0, 0); " // 取时间戳最小的元素
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); " // 如果新插入的元素是zset的第一个元素,做channel通知
+ "end;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
3. 数据消费
创建RDelayedQueue时,redisson创建了一个QueueTransferTask任务,负责从redisson_delay_queue_timeout:{delayBlockingQueue}
将到期的数据迁移到delayBlockingQueue
中
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getName());
queueName = prefixName("redisson_delay_queue", getName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " // 从redisson_delay_queue_timeout拿到期的任务
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('rpush', KEYS[1], value);" // 写入到 delayBlockingQueue
+ "redis.call('lrem', KEYS[3], 1, v);" // 从 redisson_delay_queue 删除
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));" // 从 redisson_delay_queue_timeout 删除
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then " // 如果最小时间戳的任务存在,返回它的时间戳
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName), // KEYS: delayBlockingQueue , redisson_delay_queue_timeout*、redisson_delay_queue*
System.currentTimeMillis(), 100);
}
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
4. RBlockingQueue
通过[[#3. 数据消费]]的操作,redisson已经将到期的延迟任务写入到delayBlockingQueue了,剩下要做的就是用delayBlockingQueue实现阻塞队列了,核心代码在 RedissonBlockingQueue,其实实现很简单,我们来看下代码,take()方法实际只是执行了一个redis命令BLPOP
@Override
public V take() throws InterruptedException {
return commandExecutor.getInterrupted(takeAsync());
}
@Override
public RFuture<V> takeAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}