当前位置: 首页 > article >正文

天机学堂7--Redisson自定义注解AOP以及SPEL表达式实现分布式锁

文章目录

  • 集群下的锁失效问题
  • Redis中的setnx命令实现分布式锁
    • setnx基本原理
    • 死锁问题
    • 利用Redis实现的简单分布式锁流程
    • setnx的分布式锁的问题
      • 锁误删问题
      • 超时释放问题
      • 其它问题
  • Redisson
    • 基于注解的分布式锁
    • 工厂模式 选择锁类型
    • 策略模式提供 重试策略 + 失败策略组合
    • 基于SPEL的动态锁名
  • 相关知识点
    • 自定义注解
      • Redis Pub/Sub 的工作原理
      • 失败重试机制在 Redis Pub/Sub 中的应用
      • 1. **消息确认和重试机制**
      • 2. ==使用 Redis Streams 替代 Pub/Sub==
      • ~~3. **使用 Redis Pub/Sub 的失败重试方案**~~
      • 总结

集群下的锁失效问题

Synchronized中的重量级锁,底层就是基于锁监视器(Monitor)来实现的。简单来说就是锁对象头会指向一个锁监视器,而在监视器中则会记录一些信息,比如:

  • _owner:持有锁的线程
  • _recursions:锁重入次数

每一个锁对象,都会指向一个锁监视器,而每一个锁监视器,同一时刻只能被一个线程持有,这样就实现了互斥效果。但前提是,多个线程使用的是同一把锁。

但问题来了,我们的服务将来肯定会多实例不是,形成集群。每一个实例都会有一个自己的JVM运行环境,因此即便是同一个用户,如果并发的发起了多个请求,由于请求进入了多个JVM,就会出现多个锁对象(用户id对象),自然就有多个锁监视器。此时就会出现每个JVM内部都有一个线程获取锁成功的情况,没有达到互斥的效果,并发安全问题就可能再次发生了:
在这里插入图片描述
我们不能让每个实例去使用各自的JVM内部锁监视器,而是应该在多个实例外部寻找一个锁监视器,多个实例争抢同一把锁。像这样的锁,就称为分布式锁

分布式锁必须要满足的特征:

  • 多JVM实例都可以访问
  • 互斥

能满足上述特征的组件有很多,因此实现分布式锁的方式也非常多,例如:

  • 基于MySQL
  • 基于Redis
  • 基于Zookeeper
  • 基于ETCD
    但目前使用最广泛的还应该是基于Redis的分布式锁。

Redis中的setnx命令实现分布式锁

Redis本身可以被任意JVM实例访问,同时Redis中的setnx命令具备互斥性,因此符合分布式锁的需求

setnx基本原理

Redis的setnx命令是对string类型数据的操作,语法如下:
给key赋值为value: SETNX key value

当前仅当key不存在的时候,setnx才能执行成功,并且返回1,其它情况都会执行失败,并且返回0.我们就可以认为返回值是1就是获取锁成功,返回值是0就是获取锁失败,实现互斥效果。
而当业务执行完成时,我们只需要删除这个key即可释放锁。这个时候其它线程又可以再次获取锁(执行setnx成功)了。
删除指定key,用来释放锁: DEL key

死锁问题

不过我们要考虑一种极端情况,比如我们获取锁成功,还未释放锁呢当前实例突然宕机了!那么释放锁的逻辑自然就永远不会被执行,这样lock就永远存在,再也不会有其它线程获取锁成功了!出现了死锁问题: 利用Redis的KEY过期时间机制,在获取锁时给锁添加一个超时时间:
获取锁,并记录持有锁的线程: SETNX lock thread1
设置过期时间,避免死锁: EXPIRE lock 20
这里我们设置超时时间为20秒,远超任务执行时间。当业务正常执行时,这个过期时间不起作用
但是如果当前服务实例宕机,DEL无法执行。但由于我们设置了20秒的过期时间,当超过这个时间时,锁会因为过期被删除,因此就等于释放锁了,从而避免了死锁问题。这种策略就是超时释放锁策略。
但新的问题来了,SETNX和EXPIRE是两条命令,如果我执行完SETNX,还没来得急执行EXPIRE时服务已经宕机了,这样加锁成功,但锁超时时间依然没能设置!死锁问题岂不是再次发生了?!
所以,必须保证SETNX和EXPIRE两个操作的原子性。事实上,Redis中的set命令就能同时实现setnx和expire的效果:
NX 等同于SETNX lock thread1效果, EX 20 等同于 EXPIRE lock 20效果
SET lock thread1 NX EX 20

利用Redis实现的简单分布式锁流程

在这里插入图片描述

public class RedisLock {

    private final String key;
    private final StringRedisTemplate redisTemplate;

    /**
     * 尝试获取锁
     * @param leaseTime 锁自动释放时间
     * @param unit 时间单位
     * @return 是否获取成功,true:获取锁成功;false:获取锁失败
     */
    public boolean tryLock(long leaseTime, TimeUnit unit){
        // 1.获取线程名称
        String threadValue = Thread.currentThread().getName();
        // 2.获取锁
        Boolean success = redisTemplate.opsForValue().setIfAbsent(key, threadValue, leaseTime, unit);
        // 3.返回结果
        return BooleanUtils.isTrue(success);
    }

    /**
     * 释放锁
     */
    public void unlock(){
        redisTemplate.delete(key);
    }
}

setnx的分布式锁的问题

锁误删问题

例如,有线程1获取锁成功,并且执行完任务,正准备释放锁,但是因为某种原因导致线程1释放锁的操作被阻塞了,直到锁被超时释放。就在此时,有一个新的线程2来尝试获取锁。因为线程1的锁被超时释放,因此线程2是可以获取锁成功的。而就在此时,线程1醒来,继续执行释放锁的操作,也就是DEL.结果就把线程2的锁给删除了。然而此时线程2还在执行任务,如果有其它线程再来获取锁,就会认为无人持有锁从而获取锁成功,于是多个线程再次并行执行,并发安全问题就可能再次发生了
解决方法:释放锁前要检查是不是自己的锁

超时释放问题

线程1获取锁成功,并且执行业务完成,并且也判断了锁标示,确实与自己一致:
接下来,线程1应该去释放自己的锁了,可就在此时发生了阻塞!直到锁超时释放:然后,线程2来获取锁,又和上面一样了。

总结一下,误删的原因归根结底是因为什么?

  • 超时释放
  • 判断锁标示、删除锁两个动作不是原子操作

操作锁的多行命令又该如何确保原子性?

其它问题

除了上述问题以外,分布式锁还会碰到一些其它问题:

  • 锁的重入问题:同一个线程多次获取锁的场景,目前不支持,可能会导致死锁
  • 锁失败的重试问题:获取锁失败后要不要重试?目前是直接失败,不支持重试
  • Redis主从的一致性问题:由于主从同步存在延迟,当线程在主节点获取锁后,从节点可能未同步锁信息。如果此时主宕机,会出现锁失效情况。此时会有其它线程也获取锁成功。从而出现并发安全问题。

当然,上述问题并非无法解决,只不过会比较麻烦。例如:

  • 原子性问题:可以利用Redis的LUA脚本来编写锁操作,确保原子性
  • 超时问题:利用WatchDog(看门狗)机制,获取锁成功时开启一个定时任务,在锁到期前自动续期,避免超时释放。而当服务宕机后,WatchDog跟着停止运行,不会导致死锁。
  • 锁重入问题:可以模拟Synchronized原理,放弃setnx,而是利用Redis的Hash结构来记录锁的持有者以及重入次数,获取锁时重入次数+1,释放锁是重入次数-1,次数为0则锁删除
  • 主从一致性问题:可以利用Redis官网推荐的RedLock机制来解决

这些解决方案实现起来比较复杂,因此我们通常会使用一些开源框架来实现分布式锁,而不是自己来编码实现。目前对这些解决方案实现的比较完善的一个第三方组件:Redisson

Redisson

在这里插入图片描述
首先引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
</dependency>

然后是配置:

@Configuration
public class RedisConfig {
   @Bean
   public RedissonClient redissonClient() {
       // 配置类
       Config config = new Config();
       // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址 
       config.useSingleServer()
           .setAddress("redis://192.168.150.101:6379")
           .setPassowrd("123321");
       // 创建客户端
       return Redisson.create(config);
   }
}

tj-common里面已经配置了,所以不需要重复配置
在这里插入图片描述

最后是基本用法:

@Autowired
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
  // 1.获取锁对象,指定锁名称
  RLock lock = redissonClient.getLock("anyLock");//anylock是锁的名字也是redis的键值
  try {
      // 2.尝试获取锁,参数:waitTime、leaseTime、时间单位
      boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
      if (!isLock) {
          // 获取锁失败处理 ..
      } else {
          // 获取锁成功处理
      }
  } finally {
      // 4.释放锁
      lock.unlock();//判断锁是否属于自己+原子性都有实现
  }
}

在这里插入图片描述
Watch Dog看门狗不能设置失效时间,会设置默认的失效时间。

Redisson解决上面的问题:

  1. 原子性: Lua保证 判断锁是不是自己的,操作的原子性
  2. 超时问题:Watch Dog看门狗,会专门创建一个线程,监控当前的分布式锁有没有结束,(如果正在使用着锁)没有结束的话会每10s调整过期时间。就检测到正在使用着会把过期时间重置回30s, 不用担心“我正在用着锁释放了”导致的安全问题
  3. 不可重入:Redis的Hash结构来记录锁的持有者以及重入次数
  4. 失败重试:redis的发布订阅(Pub/Sub)
  5. 主从一致性问题:可以利用Redis官网推荐的RedLock机制来解决。
    在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
向games频道发送消息,上面订阅了games频道的就会收到消息’hello’
在这里插入图片描述

基于注解的分布式锁

基于AOP的思想,将业务部分作为切入点,将业务前后的锁操作作为环绕增强。注解的核心作用是两个:

  • 标记切入点
  • 传递锁参数

注解本身起到标记作用,同时还要带上锁参数:

  • 锁名称
  • 锁等待时间
  • 锁超时时间
  • 时间单位

Step1:自定义注解锁:

package com.tianji.promotion.annotation;

import com.tianji.promotion.enums.MyLockStrategy;
import com.tianji.promotion.enums.MyLockType;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

@Retention(RetentionPolicy.RUNTIME) // 运行时生效
@Target(ElementType.METHOD) // 作用于方法
public @interface MyLock {
    String name();  // 锁名称

    long waitTime() default 1;  // 申请锁的等待时间

    long leaseTime() default -1;    // 持有锁的TTL有效时间

    TimeUnit unit() default TimeUnit.SECONDS;   // 时间单位
}

Step2:定义切面类:
在这里插入图片描述
Step3:定义好了锁注解和切面,接下来就可以改造业务了:
在这里插入图片描述
怎么定义@Transactional和@MyLock的顺序,默认事务的执行顺序比较靠后(其注解里面的order值较高顺序靠后)。 所以 能保证是 先获取锁再执行事务

工厂模式 选择锁类型

Step1: 在注解 锁MyLock里面加入一个属性
锁的类型,默认为可重入锁,由工厂模式根据lockType进行创建

MyLockType lockType() default MyLockType.RE_ENTRANT_LOCK;

Step2: 在切面类中创建锁对象(更新为工厂模式创建)

private final MyLockFactory myLockFactory;

//RLock lock = redissonClient.getLock(myLock.name());  // 只能获取可重入锁
RLock lock = myLockFactory.getLock(myLock.lockType(), lockName);

工厂模式MyLockFactory:

import com.tianji.promotion.enums.MyLockType;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

@Component
public class MyLockFactory {
	// 锁对象类型,方法引用
    private final Map<MyLockType, Function<String, RLock>> lockHandlers;

    /**
     * 使用工厂模式,来实现不同的锁类型
     *
     * @param redissonClient Redisson 客户端实例
     */
    public MyLockFactory(RedissonClient redissonClient) {
        // 初始化锁处理器映射表
        this.lockHandlers = new EnumMap<>(MyLockType.class);
        // 添加不同类型的锁处理器到映射表中
        this.lockHandlers.put(MyLockType.RE_ENTRANT_LOCK, redissonClient::getLock);
        this.lockHandlers.put(MyLockType.FAIR_LOCK, redissonClient::getFairLock);
        this.lockHandlers.put(MyLockType.READ_LOCK, name -> redissonClient.getReadWriteLock(name).readLock());
        this.lockHandlers.put(MyLockType.WRITE_LOCK, name -> redissonClient.getReadWriteLock(name).writeLock());
    }

    /**
     * 获取指定类型的锁实例
     *
     * @param lockType 锁类型
     * @param name     锁名称
     * @return 对应类型的锁实例
     */
    public RLock getLock(MyLockType lockType, String name){
        // get获取锁类型的引用,apply调用对应的创建方法
        return lockHandlers.get(lockType).apply(name);
    }
}

public enum MyLockType {
    RE_ENTRANT_LOCK, // 可重入锁
    FAIR_LOCK, // 公平锁
    READ_LOCK, // 读锁
    WRITE_LOCK, // 写锁
    ;
}

策略模式提供 重试策略 + 失败策略组合

定义枚举类,枚举Redisson分布式锁的锁失败的处理策略:

import com.tianji.promotion.annotation.MyLock;
import org.redisson.api.RLock;
public enum MyLockStrategy {
    SKIP_FAST(){    // 枚举项,快速结束 = 不重试+快速失败
        @Override
        public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
            return lock.tryLock(0, prop.leaseTime(), prop.unit());
        }
    },
    FAIL_FAST(){    // 枚举项,快速失败 = 不重试+抛出异常
        @Override
        public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
            boolean isLock = lock.tryLock(0, prop.leaseTime(), prop.unit());
            if (!isLock) {
                throw new BizIllegalException("请求太频繁");
            }
            return true;
        }
    },
    KEEP_TRYING(){  // 枚举项,无限重试
        @Override
        public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
            lock.lock( prop.leaseTime(), prop.unit());
            return true;
        }
    },
    SKIP_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后结束 = 有限重试+直接结束
        @Override
        public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
            return lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());
        }
    },
    FAIL_AFTER_RETRY_TIMEOUT(){ // 枚举项,重试超时后失败 = 有限重试+抛出异常
        @Override
        public boolean tryLock(RLock lock, MyLock prop) throws InterruptedException {
            boolean isLock = lock.tryLock(prop.waitTime(), prop.leaseTime(), prop.unit());
            if (!isLock) {
                throw new BizIllegalException("请求太频繁");
            }
            return true;
        }
    },
    ;

    public abstract boolean tryLock(RLock lock, MyLock prop) throws InterruptedException;
}

工厂模式 选择锁类型一样, 在注解 锁MyLock里面加入一个属性,实现可选策略:

// 锁的失败策略,默认为重试超时后失败(有限重试,失败后抛出异常),由工厂模式根据lockType进行创建
MyLockStrategy lockStrategy() default MyLockStrategy.FAIL_AFTER_RETRY_TIMEOUT;

修改切面代码,基于用户选择的策略来处理:
在这里插入图片描述

就可以在使用锁的时候自由选择锁类型、锁策略了:
在这里插入图片描述

基于SPEL的动态锁名

现在实现的锁版本还没有userID
在这里插入图片描述
在当前业务中,我们的锁对象本来应该是当前登录用户,是动态获取的。而加锁是基于注解参数添加的,在编码时就需要指定。怎么办?

Spring中提供了一种表达式语法,称为SPEL表达式,可以执行java代码,获取任意参数。
思路:
我们可以让用户指定锁名称参数时不要写死,而是基于SPEL表达式。在创建锁对象时,解析SPEL表达式,动态获取锁名称。
首先,在使用锁注解时,锁名称可以利用SPEL表达式,例如我们指定锁名称中要包含参数中的用户id,则可以这样写:
在这里插入图片描述

而如果是通过UserContext.getUser()获取,则可以利用下面的语法:
在这里插入图片描述

这里T(类名).方法名()就是调用静态方法。

获取锁名称用的是getLockName()这个方法:


/**
 * SPEL的正则规则
 */
private static final Pattern pattern = Pattern.compile("\\#\\{([^\\}]*)\\}");
/**
 * 方法参数解析器
 */
private static final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();

/**
 * 解析锁名称
 * @param name 原始锁名称
 * @param pjp 切入点
 * @return 解析后的锁名称
 */
private String getLockName(String name, ProceedingJoinPoint pjp) {
    // 1.判断是否存在spel表达式
    if (StringUtils.isBlank(name) || !name.contains("#")) {
        // 不存在,直接返回
        return name;
    }
    // 2.构建context,也就是SPEL表达式获取参数的上下文环境,这里上下文就是切入点的参数列表
    EvaluationContext context = new MethodBasedEvaluationContext(
            TypedValue.NULL, resolveMethod(pjp), pjp.getArgs(), parameterNameDiscoverer);
    // 3.构建SPEL解析器
    ExpressionParser parser = new SpelExpressionParser();
    // 4.循环处理,因为表达式中可以包含多个表达式
    Matcher matcher = pattern.matcher(name);
    while (matcher.find()) {
        // 4.1.获取表达式
        String tmp = matcher.group();
        String group = matcher.group(1);
        // 4.2.这里要判断表达式是否以 T字符开头,这种属于解析静态方法,不走上下文
        Expression expression = parser.parseExpression(group.charAt(0) == 'T' ? group : "#" + group);
        // 4.3.解析出表达式对应的值
        Object value = expression.getValue(context);
        // 4.4.用值替换锁名称中的SPEL表达式
        name = name.replace(tmp, ObjectUtils.nullSafeToString(value));
    }
    return name;
}

private Method resolveMethod(ProceedingJoinPoint pjp) {
    // 1.获取方法签名
    MethodSignature signature = (MethodSignature)pjp.getSignature();
    // 2.获取字节码
    Class<?> clazz = pjp.getTarget().getClass();
    // 3.方法名称
    String name = signature.getName();
    // 4.方法参数列表
    Class<?>[] parameterTypes = signature.getMethod().getParameterTypes();
    return tryGetDeclaredMethod(clazz, name, parameterTypes);
}

private Method tryGetDeclaredMethod(Class<?> clazz, String name, Class<?> ... parameterTypes){
    try {
        // 5.反射获取方法
        return clazz.getDeclaredMethod(name, parameterTypes);
    } catch (NoSuchMethodException e) {
        Class<?> superClass = clazz.getSuperclass();
        if (superClass != null) {
            // 尝试从父类寻找
            return tryGetDeclaredMethod(superClass, name, parameterTypes);
        }
    }
    return null;
}

解析SPEL
在切面中,我们需要基于注解中的锁名称做动态解析,而不是直接使用名称:
在这里插入图片描述

相关知识点

自定义注解

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface xxxx{
}

通知类:里面的切点表达式,public是方法返回类型,路径…*是service及其子包下的类 后面的.*是任意方法 (…)指任意参数
在这里插入图片描述
不用切点表达式而是用注解 控制仅实现类中的一个方法:(@annotation(路径.实现类中方法的注解)
(@annotation(路径.实现类中方法的注解))注解直接写到方法的参数上:@Around("@annotation(xxx类)")
在这里插入图片描述
以上图为例,只要方法加了@printTime注解,方法就是切点,再走这个方法之前就会走环绕通知@Around() around()方法

Redis 的 Pub/Sub (发布/订阅) 是一种消息传递机制,它允许客户端订阅一个或多个频道,并接收其他客户端发布到这些频道的消息。在使用 Redis Pub/Sub 的过程中,可能会遇到由于网络故障、订阅客户端崩溃或其他原因导致消息接收失败的情况。因此,失败重试机制可以帮助保证消息在分布式环境下的可靠性。

Redis Pub/Sub 的工作原理

  • 发布者 (Publisher) 将消息发布到指定的频道。
  • 订阅者 (Subscriber) 订阅频道,并监听来自该频道的消息。
  • 当有消息发布到订阅的频道时,Redis 会将这些消息推送到所有订阅了该频道的客户端。

失败重试机制在 Redis Pub/Sub 中的应用

在 Redis Pub/Sub 中,如果出现网络问题或客户端挂掉导致的消息丢失,默认情况下消息不会被重试或保存(Redis 本身不支持持久化消息)。因此,如果需要实现失败重试机制,可以采取以下几种策略:

1. 消息确认和重试机制

  • 问题:如果消息在订阅者接收时失败(比如网络中断、订阅者崩溃等),这些消息会丢失。
  • 解决方案:一种常见的做法是通过消息确认机制来实现重试。每个消息可以通过客户端进行确认,如果未成功处理消息,则将其重新发布到一个队列或另一个频道,等待下一次重试。

实现方式

  • 订阅者在接收到消息时,需要向发布者或消息队列发送确认信号。如果在一定时间内没有收到确认,可以将该消息重新推送到某个死信队列(Dead Letter Queue,DLQ)或者一个等待重试的队列中,等到订阅者恢复正常后,再进行重试。

示例

// Redis Pub/Sub 订阅者代码(使用 Jedis 客户端)
public class MySubscriber extends JedisPubSub {

    @Override
    public void onMessage(String channel, String message) {
        try {
            // 处理消息
            processMessage(message);

            // 发送确认信号
            sendAcknowledgment(message);
        } catch (Exception e) {
            // 处理失败,重试机制
            handleFailure(message);
        }
    }

    private void handleFailure(String message) {
        // 如果处理失败,可以将消息重新推送到一个队列或保存到死信队列
        redisClient.lpush("retryQueue", message);
    }

    // 确认消息已处理
    private void sendAcknowledgment(String message) {
        redisClient.publish("acknowledgmentChannel", message); // 可选的确认机制
    }
}

2. 使用 Redis Streams 替代 Pub/Sub

Redis Streams 是一种基于日志的消息队列结构,适合需要消息持久化和重试的场景。与传统的 Pub/Sub 模式不同,Redis Streams 可以存储消息,并且订阅者可以从流的任意位置读取消息,避免了消息丢失的问题。

特点

  • 持久化:Redis Streams 会持久化消息到磁盘,避免了消息丢失的风险。
  • 消息确认和重试消息的消费者可以通过 XACK 命令显式地确认消息,如果没有确认,Redis 可以将消息重新分配给其他消费者或重新发送给原消费者进行重试。

示例

// 发布消息到 Redis Stream
redisClient.xadd("mystream", Map.of("message", "hello"));

// 订阅者处理消息
while (true) {
    List<Map.Entry<String, List<StreamEntry>>> messages = redisClient.xread(
            StreamEntryID.UNRECEIVED, Map.of("mystream", "0"));
    
    for (Map.Entry<String, List<StreamEntry>> streamEntry : messages) {
        for (StreamEntry entry : streamEntry.getValue()) {
            try {
                processMessage(entry.getFields().get("message"));
                redisClient.xack("mystream", "consumerGroup", entry.getID()); // 消息确认
            } catch (Exception e) {
                // 处理失败,可以重新发送消息进行重试
                redisClient.xadd("retryQueue", Map.of("message", entry.getFields().get("message")));
            }
        }
    }
}

使用 Redis Streams 的优势

  • 消息不丢失:Stream 中的消息会持久化在 Redis 中,订阅者可以在后续任何时候读取。
  • 自动重试:可以通过消费组的方式来确保如果某个消费者失败,其他消费者会接管任务。
  • 确认机制:消费者可以确认已处理的消息,如果没有确认,Redis 会重新分配任务。

3. 使用 Redis Pub/Sub 的失败重试方案

如果仍然希望使用 Redis 的传统 Pub/Sub 模式并实现某种程度的消息重试,可以结合一些外部机制,例如将消息发布到 Redis Pub/Sub 频道后,同时将消息也存储到一个队列中(如 Redis List、Redis Stream),然后通过定时任务或后台进程来检查未确认的消息。

步骤

  • 订阅者从 Redis 频道获取消息时,在处理前将消息的 ID 记录下来。
  • 如果处理失败,订阅者会将消息 ID 添加到一个待重试的队列中(例如 Redis List 或 Stream)。
  • 定期检查待重试队列并重试这些消息。

总结

虽然 Redis Pub/Sub 本身并不直接支持消息的失败重试机制,但可以通过以下几种方式来实现:

  • 使用 Redis Streams 代替 Pub/Sub,利用其消息持久化和消费确认功能来实现失败重试。
  • 使用 消息确认机制,结合 Redis 队列(如 List、Stream)将失败的消息重新推送,进行后续重试。
  • 如果不希望改变现有的 Pub/Sub 模式,可以通过后台任务周期性地重试失败消息,将消息记录在专门的队列中。

http://www.kler.cn/a/513571.html

相关文章:

  • C语言练习(17)
  • Java-数据结构-二叉树习题(2)
  • 电子科大2024秋《大数据分析与智能计算》真题回忆
  • Python----Python高级(文件操作open,os模块对于文件操作,shutil模块 )
  • 气膜料仓:工业仓储的高效与安全新选择—轻空间
  • 日历热力图,月度数据可视化图表(日活跃图、格子图)vue组件
  • 顽固性失眠怎么调理
  • InVideo AI技术浅析(五):生成对抗网络
  • centos下设置服务器开机自启动 redis
  • MongoDB实训:电子商务日志存储任务
  • leetcode 面试经典 150 题:插入区间
  • 音频入门(一):音频基础知识与分类的基本流程
  • AIGC视频生成模型:Stability AI的SVD(Stable Video Diffusion)模型
  • python+pygame+pytmx+map editor开发一个tiled游戏demo 05使用object层初始化player位置
  • 前端 window.print() 打印图片
  • 云知声语音识别技术:原理、突破与应用前景
  • Python数据可视化(够用版):懂基础 + 专业的图表抛给Tableau等专业绘图工具
  • 常用邮箱有哪些推荐的服务?
  • tcpdump 精准分析vxlan网络
  • 前端缓存策略:强缓存与协商缓存深度剖析
  • 3D可视化定制:开启个性化购物新时代,所见即所得
  • latex如何让目录后面有点
  • 初探——【Linux】程序的翻译与动静态链接
  • 电子商务的安全
  • 【C++】模板(进阶)
  • C# 中 readonly 与 const 的使用