当前位置: 首页 > article >正文

基于Redis实现的延迟队列

1. 适用场景

日常开发中,我们经常遇到这样的需求,在某个事件发生后,过一段时间做一个额外的动作,比如

  1. 拼单,如果2小时未能成单,取消拼单
  2. 下单,30分钟内未支付,取消订单
    之前的我们的做法通常是通过定时任务轮询,比如扫描创建时间是2小时之前,状态是未成功的拼单,然后做取消操作。这种方案存在的问题是:
  3. 扫描对数据库造成一定的压力
  4. 轮询的时间间隔会导致操作有一定的延迟
    延迟消息正是用来解决这类问题的银弹。

2. JDK实现

2.1 使用方式

JDK内部提供了DelayQueue队列和Delayed接口来实现延迟消息,我们先来看一个简单的Demo,我们会创建一个DelayMessage用来代表延迟消息,延迟消息需要实现Delayed接口

  1. getDealy,返回消息的延迟时间
  2. compareTo,为了让多个延迟消息排序,将时间最早的消息排到最前面
public class DelayMessage implements Delayed {
    private long expiredAtMs;
    private long delayMs;
    private String message;
    public DelayMessage(long delaySeconds, String message) {
        this.delayMs = delaySeconds * 1000;
        this.expiredAtMs = System.currentTimeMillis() + delayMs;
        this.message = message;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = expiredAtMs - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        long sTtl = getDelay(TimeUnit.MILLISECONDS);
        long oTtl = o.getDelay(TimeUnit.MILLISECONDS);
        return sTtl < oTtl ? -1 : (sTtl > oTtl ? 1 : 0);
    }
    public String getMessage() {
        return this.message;
    }
}

接着只需要创建消息队列,将延迟消息放入到队列中即可,然后创建一个线程来消费延迟队列即可

DelayQueue<DelayMessage> queue = new DelayQueue<>();
queue.put(new DelayMessage(1, "1s later"));
queue.put(new DelayMessage(60, "60s later"));
queue.put(new DelayMessage(120, "120s later"));

ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(() -> {
    try {
        while (true) {
            DelayMessage dm = queue.take();
            System.out.println(currentTimeInText() + "_" + dm.getMessage());
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
});
2.2 实现原理

从DelayQueue的源码我们可以看到,整个DelayQueue的核心就在于3个点:

  1. 数据存储,基于PriorityQueue,通过Delayed的compareTo方法排序,即基于时间顺序
  2. 数据写入,offer/put方法
  3. 数据消费,take/poll方法
1. 数据写入
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);           // PriorityQueue写入
        if (q.peek() == e) {  // 如果刚刚写入的消息是最高优先级的(最早被消费的),唤醒在take()方法阻塞的线程
            leader = null;    // Leader-Follow Parttern,减少RaceCondition, http://www.cs.wustl.edu/~schmidt/POSA/POSA2/
            available.signal(); // 唤醒在take()阻塞的线程
        }
        return true;
    } finally {
        lock.unlock();
    }
}
2. 数据消费
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();  // 队列为空,阻塞,直到offer(e)被调用
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)     // 延迟时间到了,取出item供使用
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();  // await释放锁,其他线程执行take(),如果leader != null有负责处理头部item的线程
                else {
                    Thread thisThread = Thread.currentThread();  // 走到这说明头部元素暂无处理线程,将当前线程设定为处理线程
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);  // 等待延迟时间后自动唤醒,重新进入循环,处理queue头部item
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

代码很短,设计还是巧妙的,尤其是Leader-Follower模式的使用,在我们实现自己的组件时可以借鉴。

3. Redis实现

JDK实现的延迟队列已经能解决部分场景了,不过也存在两个明显的问题

  1. 队列数据没持久化,重启或进程崩溃都会导致数据丢失
  2. 不支持分布式,不能跨进程共享
3.1 消息队列

通过上面的JDK实现,我们已经能把Redis实现的延迟消息的逻辑猜的八九不离十了,假设我们用LIST存储,先通过LPUSH写入队列消息(message1、message2)

127.0.0.1:6379> LPUSH my_delay_queue message1
(integer) 1
127.0.0.1:6379> LPUSH my_delay_queue message2
(integer) 2
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
2) "message1"

通过RPOPLPUSH,从队列取出待消费的消息,并暂存到临时队列(my_delay_queue)中

127.0.0.1:6379> RPOPLPUSH my_delay_queue my_delay_queue_temp
"message1"
127.0.0.1:6379> LRANGE my_delay_queue_temp 0 -1
1) "message1"
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"

这是在程序代码中消费message1,如果消费成功,从临时队列中删除消息

127.0.0.1:6379> LREM my_delay_queue_temp 1 message1
(integer) 1

最终队列的状态是,delayQueue中只剩message2,临时队列中为空

127.0.0.1:6379> LRANGE my_delay_queue_temp 0 -1
(empty array)
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
3.2 延迟队列

用LIST只能实现FIFO,要想实现基于时间的优先级,需要改用ZSET来存储数据,用时间做时间戳

127.0.0.1:6379> ZADD s_delay_queue 1728625236 message0
127.0.0.1:6379> ZADD s_delay_queue 1728625256 message0
127.0.0.1:6379> ZADD s_delay_queue 1728625256 message2
127.0.0.1:6379> ZADD s_delay_queue 1728625266 message3

127.0.0.1:6379> ZRANGE s_delay_queue 0 -1 WITHSCORES
1) "message0"
2) "1728625236"
3) "message1"
4) "1728625256"
5) "message2"
6) "1728625256"
7) "message3"
8) "1728625266"

通过使用ZRANGEBYSCORE获取延迟时间已经到的item

127.0.0.1:6379> ZRANGEBYSCORE s_delay_queue 0 1728625256
1) "message0"
2) "message1"
3) "message2"

ZSET并没有提供RPOPLPUSH的命令,我们使用Lua脚本来模拟这个操作,这段lua接受两个KEY,一个ARGV

  1. KEYS[1],表示ZSET的名字
  2. KEYS[2],表示LIST的名字
  3. ARGV[1],表示SCORE的范围截至时间
local elements = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])
if #elements > 0 then
    for i, element in ipairs(elements) do
        redis.call('LPUSH', KEYS[2], element)
        redis.call('ZREM', KEYS[1], element)
    end
end
return elements

然后是通过EVAL执行这段Lua,这里我们从ZSET(s_delay_queue)读取score <= 1728625237的item,返回并暂存到LIST(s_delay_queue_temp)中,模拟了RPOPLPUSH的操作

127.0.0.1:6379> EVAL "local elements = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1]) if #elements > 0 then for i, element in ipairs(elements) do redis.call('LPUSH', KEYS[2], element) redis.call('ZREM', KEYS[1], element) end end return elements" 2 s_delay_queue s_delay_queue_temp 1728625237
1) "message0"

剩下的逻辑基本上和[[基于Redis的延迟队列#3.1 消息队列]]一样,在程序中消费message,成功之后删除s_delay_queue_temp中的数据。我们需要做的是在程序中定时的执行这段Lua脚本,并且实现类似DelayQueue的逻辑,支持阻塞的take()操作,以及消费失败时的错误处理,显然要处理的错误细节并不少。

3.3 Redisson实现
1. 数据结构

Redisson封装了基于Redis的延迟消息实现,我们来看一个使用的Redisson延迟队列的demo

Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);

RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayBlockingQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);

delayedQueue.offer("message1", 1, TimeUnit.MINUTES);
delayedQueue.offer("message2", 5, TimeUnit.MINUTES);
delayedQueue.offer("message3", 10, TimeUnit.MINUTES);
delayedQueue.offer("message4", 15, TimeUnit.MINUTES);

ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(() -> {
    while (true) {
        String data = blockingQueue.poll(60, TimeUnit.SECONDS);
        if (data != null) {
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":" + data);
        }
    }
});

Redisson的实现比[[#3.2 延迟队列]]要负责一点,它内部构建了4个数据结构。通过Redis的命令查看,我们能看到3个KEY

127.0.0.1:6379> KEYS *
2) "delayBlockingQueue"
4) "redisson_delay_queue:{delayBlockingQueue}"
6) "redisson_delay_queue_timeout:{delayBlockingQueue}"
  1. delayBlockingQueue是我们创建RBlockingQueue时指定的名称,用来存储延迟时间到期,但尚未被处理的任务
  2. redisson_delay_queue_timeout:{delayBlockingQueue},类型是zset,记录延迟任务和时间
  3. redisson_delay_queue:{delayBlockingQueue},类型是list,记录任务列表,保持任务的顺序
    通过TYPE命令,我们能查看他们的数据类型
127.0.0.1:6379> TYPE redisson_delay_queue:{delayBlockingQueue}
list
127.0.0.1:6379> TYPE redisson_delay_queue_timeout:{delayBlockingQueue}
zset

此外Redission还创建了一个Channel,用来在delayQueue写入数据的时候做通知

127.0.0.1:6379> PUBSUB channels
1) "redisson_delay_queue_channel:{delayBlockingQueue}"
2. 数据写入

通过RDelayedQueue写入数据的时候,最终会调用offerAsync方法

public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
    if (delay < 0) {
        throw new IllegalArgumentException("Delay can't be negative");
    }
    
    long delayInMs = timeUnit.toMillis(delay);
    long timeout = System.currentTimeMillis() + delayInMs;
 
    long randomId = ThreadLocalRandom.current().nextLong();
    return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
            "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
          + "redis.call('zadd', KEYS[2], ARGV[1], value);"                       // 写入  redisson_delay_queue_timeout:{delayBlockingQueue}
          + "redis.call('rpush', KEYS[3], value);"                               // 写入  redisson_delay_queue:{delayBlockingQueue}
          // if new object added to queue head when publish its startTime 
          // to all scheduler workers 
          + "local v = redis.call('zrange', KEYS[2], 0, 0); "                    // 取时间戳最小的元素
          + "if v[1] == value then "
             + "redis.call('publish', KEYS[4], ARGV[1]); "                       // 如果新插入的元素是zset的第一个元素,做channel通知
          + "end;",
          Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
          timeout, randomId, encode(e));
}
3. 数据消费

创建RDelayedQueue时,redisson创建了一个QueueTransferTask任务,负责从redisson_delay_queue_timeout:{delayBlockingQueue}将到期的数据迁移到delayBlockingQueue

protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
    super(codec, commandExecutor, name);
    channelName = prefixName("redisson_delay_queue_channel", getName());
    queueName = prefixName("redisson_delay_queue", getName());
    timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
    
    QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
        
        @Override
        protected RFuture<Long> pushTaskAsync() {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                    "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " // 从redisson_delay_queue_timeout拿到期的任务
                  + "if #expiredValues > 0 then "
                      + "for i, v in ipairs(expiredValues) do "
                          + "local randomId, value = struct.unpack('dLc0', v);"
                          + "redis.call('rpush', KEYS[1], value);"      // 写入到 delayBlockingQueue
                          + "redis.call('lrem', KEYS[3], 1, v);"        // 从 redisson_delay_queue 删除
                      + "end; "
                      + "redis.call('zrem', KEYS[2], unpack(expiredValues));" // 从 redisson_delay_queue_timeout 删除
                  + "end; "
                    // get startTime from scheduler queue head task
                  + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                  + "if v[1] ~= nil then " // 如果最小时间戳的任务存在,返回它的时间戳
                     + "return v[2]; "
                  + "end "
                  + "return nil;",
                  Arrays.<Object>asList(getName(), timeoutSetName, queueName),  // KEYS: delayBlockingQueue , redisson_delay_queue_timeout*、redisson_delay_queue*
                  System.currentTimeMillis(), 100);
        }
        
        @Override
        protected RTopic getTopic() {
            return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
        }
    };
    
    queueTransferService.schedule(queueName, task);
    
    this.queueTransferService = queueTransferService;
}
4. RBlockingQueue

通过[[#3. 数据消费]]的操作,redisson已经将到期的延迟任务写入到delayBlockingQueue了,剩下要做的就是用delayBlockingQueue实现阻塞队列了,核心代码在 RedissonBlockingQueue,其实实现很简单,我们来看下代码,take()方法实际只是执行了一个redis命令BLPOP

@Override
public V take() throws InterruptedException {
    return commandExecutor.getInterrupted(takeAsync());
}
@Override
public RFuture<V> takeAsync() {
    return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}

http://www.kler.cn/news/354008.html

相关文章:

  • MATLAB基础应用精讲-【数模应用】HLM模型
  • 20240803 芯动科技 笔试
  • 10秒钟用Midjourney画出国风味的变形金刚
  • 重塑输电线路运维管理,巡检管理系统守护电网稳定运行
  • JAVA地狱级笑话
  • linuxC读取bin文件
  • 大数据面试题整理——MapReduce
  • 传染病防控宣传系统的设计与实现小程序springboot+论文源码调试讲解
  • Java | Leetcode Java题解之第482题秘钥格式化
  • react-JSX
  • 《沈阳工业大学学报》
  • Spring Boot动态数据源切换功能详解
  • 群晖前面加了雷池社区版,安装失败,然后无法识别出用户真实访问IP
  • React中的函数组件与类组件
  • 【Bug】docker容器之间网络通讯失败
  • 在vue中v-show不起作用
  • Axure重要元件三——中继器表单制作
  • 图书管理智能化:Spring Boot进销存系统
  • Spring如何通过三级缓存解决循环依赖的问题
  • 5G对无人机的影响!