当前位置: 首页 > article >正文

Redisson使用场景及原理

目录

一、前言

二、安装Redis

1、Windows安装Redis

​2、启动方式

3、设置密码

三、项目集成Redission客户端

1、引入依赖

四、实用场景

1、操作缓存

2、分布式锁

3、限流

3.1 创建限流器

3.2 设置限流参数

3.3 获取令牌

3.4 带超时时间获取令牌

3.5 总结


一、前言

Redis是一个开源的高性能键值存储数据库,它提供了多种数据结构来存储数据,如字符串、哈希、列表、集合、有序集合等。Redis将数据存储在内存中,以提供快速的读写访问速度,并且能够通过异步的方式将数据持久化到磁盘上。它支持复制、Lua脚本、事务处理、不同级别的持久化选项以及多种客户端语言的接口。Redis广泛用于缓存、消息队列、短时数据存储和高性能的应用场景中。

通常在SpringBoot项目中集成redis有两种方式:spring-boot-starter-data-redis和redisson-spring-boot-starter。但它们在功能、使用方式、性能以及集成方面存在一些差异。下面是对这两者的详细对比:

1. 集成方式
Spring Data Redis:
是Spring框架的一部分,提供了对Redis的高级抽象,使得Redis操作更加面向对象和易于使用。

通常与Spring Boot一起使用,通过spring-boot-starter-data-redis依赖自动配置。

使用Jedis或Lettuce作为底层客户端。

Redisson:
是一个独立的Redis客户端,提供了比Spring Data Redis更丰富的功能,如分布式数据结构(如RMap, RSet, RQueue等),分布式锁和各种原子操作。

需要手动配置和使用,不直接集成到Spring框架中,但可以通过Redisson Spring Boot Starter简化集成。

主要使用Netty进行网络通信,支持多种序列化机制。

2. 功能特性
Spring Data Redis:
支持基本的Redis操作,如键值对存储、列表、集合、有序集合等。

提供模板类(如RedisTemplate),简化Redis操作。

支持发布/订阅、地理空间等高级功能。

Redisson:
提供了比Spring Data Redis更广泛的分布式数据结构支持。

支持分布式锁、信号量、原子长整型等分布式数据结构。

内置了多种分布式服务(如分布式锁、原子操作),使得在分布式环境中使用Redis更加方便和高效。

3. 性能和易用性
Spring Data Redis:
使用Jedis或Lettuce作为客户端,Jedis是基于阻塞IO的,而Lettuce基于Netty是非阻塞的,因此在某些场景下性能更好。

易于集成和使用,特别是在Spring生态系统中。

Redisson:
基于Netty,通常在性能上优于Jedis和Lettuce(特别是在高并发场景下)。

提供了更丰富的分布式数据结构和工具,但在某些简单的使用场景下可能会显得过于复杂。

4. 社区和支持
Spring Data Redis:
作为Spring项目的一部分,拥有庞大的社区支持和良好的文档。

持续更新和维护,与Spring Boot紧密集成。

Redisson:
也是一个活跃的开源项目,拥有自己的社区和文档。

由于其专注于分布式数据结构和工具,因此在这些领域有很好的支持和应用案例。

结论
选择spring-data-redis还是Redisson取决于你的具体需求:
如果你的项目已经在使用Spring框架,并且需要简单的Redis操作,那么spring-data-redis可能是更好的选择。

如果你的项目需要更复杂的分布式数据结构和工具,特别是在分布式锁和原子操作方面,那么Redisson可能更适合你的需求。在这种情况下,虽然需要更多的手动配置,但它的功能和性能优势可能会让你觉得这是一个值得的投资。

二、安装Redis

1、Windows安装Redis

打开Redis官网,下载压缩包

解压到本地目录后,目录结构如下

2、启动方式

1、双击redis-server.exe启动

2、命令行窗口启动,在当前目录打开cmd窗口,输入:redis-server.exe redis.windows.conf

3、设置密码

Redis服务默认没有密码,如果要设置,编辑redis.windows.conf文件,找到requirepass关键字,后边是密码,修改成自定义密码后,把这行注释打开。并重启Redis服务,需要指定配置文件路径。

三、项目集成Redission客户端

本文主要介绍Redission客户端使用方式及部分高级特性原理。

1、引入依赖

<!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter 最新版本3.45.0-->    
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.6</version>
</dependency>

注意不用再引入spring-boot-starter-data-redis,在redisson-spring-boot-starter内部已经添加了依赖。

四、实用场景

1、操作缓存

直接看一个demo。

public static void main(String[] args) {
    Config config = new Config()
            .setTransportMode(TransportMode.NIO)
            .setCodec(new JsonJacksonCodec());
    config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
    RedissonClient redissonClient = Redisson.create(config);

    RBucket<Object> bucket = redissonClient.getBucket("");
    // 直接设置value
    bucket.set("123");
    // 设置value并设置过期时间
    bucket.set("123", 3, TimeUnit.SECONDS);
    
    redissonClient.shutdown();
}

显然Redission虽然也能支持Redis常见操作,但是api入门门槛较高,相比于RedisTemplate大量简单且直观的方法确实不易使用。

2、分布式锁

不过Redission也有优势,比如在分布式锁,提供了几个简单方法即可实现。比如:

public static void main(String[] args) {
    Config config = new Config()
            .setTransportMode(TransportMode.NIO)
            .setCodec(new JsonJacksonCodec());
    config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
    RedissonClient redissonClient = Redisson.create(config);

    RLock lock = redissonClient.getLock("myLock");
    // 用法1:直接上锁,需要在最后手动释放锁
    lock.tryLock();
    // 用法2:上锁并设置等待时间
    lock.tryLock(10, TimeUnit.SECONDS);
    // 用法3:上锁并设置等待时间、自动释放时间
    lock.tryLock(10, 30, TimeUnit.SECONDS);
}

上述3个tryLock方法最终都会执行

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)
1. waitTime:等待时间,即在尝试获取锁时最多的等待时间。如果超过这个时间仍未获取到锁,则会放弃获取锁。
2. leaseTime:租约时间,即获取到锁后持有的时间。如果在这段时间内没有手动释放锁,则系统会自动释放锁。默认为-1,即如果不手动释放,则锁永久有效。
3. unit:时间单位,用于指定等待时间和租约时间的单位。
4. threadId:当前线程id
区别是,第一个方法传入的waitTime和leaseTime都是-1,第二个方法传入的leaseTime是-1

需要特别注意:如果调用了第3个方法获取锁,并且leaseTime不是-1,则会在leaseTime过期后,释放锁。

这里就该提到大家都知道的看门狗机制。即获取分布式锁后,执行业务方法,如果在业务方法执行耗时比较久,则后台有个线程会一直给锁续约,前提是leaseTime=-1

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // leaseTime=-1,调用scheduleExpirationRenewal方法为当前线程续约
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

通常使用分布式锁处理逻辑如下:

RLock lock = redissonClient.getLock(cacheKey);
boolean isLocked = lock.tryLock(waitTime, timeUnit);
if (isLocked) {
    try {
        // 执行业务方法
    } finally {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
} else {
    throw new RuntimeException("尝试加锁失败");
}

3、限流

当然Redission还有另外一个比较实用的功能,限流。提到限流,大家可能会想到很多实现方式,比如使用Semaphore控制并发限流,或者使用guava框架提供的限流功能。但是这些大多只适用于单机系统,或者只对单机需要限流。如果遇到分布式服务,需要全局限流,虽然也能通过一定方式实现,但是显然没有那么优雅和高效。

接下来介绍下Redission分布式限流方案:

public static void main(String[] args) {
    Config config = new Config()
            .setTransportMode(TransportMode.NIO)
            .setCodec(new JsonJacksonCodec());
    config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
    RedissonClient redissonClient = Redisson.create(config);

    RRateLimiter rateLimiter = redissonClient.getRateLimiter("myRateLimiter");
    boolean setRate = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
    if (!setRate) {
        System.out.println("分布式限流器创建失败,已经存在。");
        rateLimiter.delete();
        setRate = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
        System.out.println("分布式限流器创建" + (setRate ? "成功" : "失败"));
    }

    CountDownLatch latch = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        new Thread(() -> {
            try {
                Thread.sleep((long) (800 * Math.random()));
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (rateLimiter.tryAcquire()) {
                System.out.println("获取令牌成功");
            } else {
                System.out.println("Request" + finalI + "获取令牌失败");
            }
            latch.countDown();
        }).start();
    }
    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    // 删除限流器
    rateLimiter.delete();
}

 注意看核心代码只有三行:

// 创建限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter("myRateLimiter");
// 设置限流参数
boolean setRate = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
// 获取令牌
rateLimiter.tryAcquire()

至于为什么这么简单神奇,接下来分析下源码。看下上边3个方法都干了啥。

3.1 创建限流器

// 方法签名
RRateLimiter getRateLimiter(String name);

@Override
public RRateLimiter getRateLimiter(String name) {
    // 只是创建了一个RedissonRateLimiter对象并返回,并且设置了name属性为限流器名称
    return new RedissonRateLimiter(commandExecutor, name);
}

3.2 设置限流参数

// 方法签名
boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);

@Override
    public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
        return get(trySetRateAsync(type, rate, rateInterval, unit));
    }

@Override
    public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
              + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
              + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
                Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal());
    }

 其实也就是设置了几个Redis缓存,key分别是rate、interval、type。

3.3 获取令牌

// 常用几个方法签名

// 方法1:获取1个令牌

boolean tryAcquire();

// 方法2:获取多个令牌

boolean tryAcquire(long permits);

// 方法3:是方法1的变体,多了2个获取令牌超时时间参数

boolean tryAcquire(long timeout, TimeUnit unit);

// 方法4:是方法2的变体,多了2个获取令牌超时时间参数

boolean tryAcquire(long permits, long timeout, TimeUnit unit);

我们先看tryAcquire()方法调用栈:

@Override
public boolean tryAcquire() {
    return tryAcquire(1);
}

@Override
public boolean tryAcquire(long permits) {
    return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));
}

最终执行tryAcquireAsync方法,内部执行Lua脚本,保证操作的原子性。每一行脚本都加了说明,其中参数KEYS和ARGS值如下:

KEYS[1]=getRawName(),即限流器名称

KEYS[2]=getValueName(),值是{限流器名称}:value,存放的是数字,当前可用许可

KEYS[3]=getClientValueName()

KEYS[4]=getPermitsName(),值是{限流器名称}:permits,数据结构zset,score是当前获取许可的时间戳

KEYS[5]=getClientPermitsName()

ARGV[1]=value,获取的许可数量

ARGV[2]=System.currentTimeMillis(),当前时间戳

ARGV[3]=ThreadLocalRandom.current().nextLong(),一个随机数

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            # rate 限流速率
            "local rate = redis.call('hget', KEYS[1], 'rate');"
            # interval 限流间隔
          + "local interval = redis.call('hget', KEYS[1], 'interval');"
            # type 限流类型,RateType枚举下标,所以OVERALL=0,PER_CLIENT=1
          + "local type = redis.call('hget', KEYS[1], 'type');"
          + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
            # valueName 值是{name}:value,存放的是数字,当前可用许可
          + "local valueName = KEYS[2];"
            # permitsName 值是{name}:permits,数据结构zset,score是当前获取许可的时间戳
          + "local permitsName = KEYS[4];"
            # type=PER_CLIENT时
          + "if type == '1' then "
                # valueName 值是{name}:value:managerId
              + "valueName = KEYS[3];"
                # permitsName 值是{name}:permits:managerId
              + "permitsName = KEYS[5];"
          + "end;"

            # 参数校验:限流速率rate >= 当前请求许可(不传默认是1)
          + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "

            # currentValue 获取当前还有多少许可
          + "local currentValue = redis.call('get', valueName); "
          + "if currentValue ~= false then " # 不是第一次获取许可
                   # expiredValues 已过期的许可
                   # zrangebyscore返回有序集合中指定分数区间(0,当前时间戳-限流区间]的成员列表,有序集成员按分数值递增(从小到大)次序排列。
                 + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                   # 获取过期许可总数
                 + "local released = 0; "
                 + "for i, v in ipairs(expiredValues) do "
                        # 函数struct.unpack从一个类结构字符串中解包出多个Lua值
                      + "local random, permits = struct.unpack('fI', v);"
                      + "released = released + permits;"
                 + "end; "

                   # 释放过期许可
                 + "if released > 0 then "
                        # zremrangebyscore移除有序集合中给定的分数区间的所有成员
                      + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                        # 当前可用许可+释放的许可数
                      + "currentValue = tonumber(currentValue) + released; "
                        # 重新设置当前可用许可
                      + "redis.call('set', valueName, currentValue);"
                 + "end;"

                 + "if tonumber(currentValue) < tonumber(ARGV[1]) then " # 剩余许可不够
                       #
                     + "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); "
                       # 返回下一个许可需要等待多少时间
                     + "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);"
                 + "else " # 剩余许可足够
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                     # 将当前可用许可数-获取许可数
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "return nil; "
                 + "end; "
          + "else " # 第一次获取许可
                   # 设置可用许可数,首次等于限流速率rate
                 + "redis.call('set', valueName, rate); "
                   # 函数struct.pack将多个Lua值打包成一个类结构(struct-like)字符串[fI,nextLong(),获取许可数]
                 + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                   # 将当前可用许可数-获取许可数
                 + "redis.call('decrby', valueName, ARGV[1]); "
                 + "return nil; "
          + "end;",
            Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
            value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong());
}

按照上述Lua脚本逻辑,我们来模拟下获取令牌的过程

假设初始列表:860 1200 1300 1800
第一次,当前是1850,则1850-1000=850,回收(0,850],没有可回收。直接放入1850,此时列表变成:860 1200 1300 1800 1850。剩余许可0

第二次,当前是1900,获取可释放许可,1900-1000=900,即(0,900],有860,回收后列表变成:1200 1300 1800 1850。剩余许可1
如果获取1个,则足够,并放入1900,此时列表变成:1200 1300 1800 1850 1900。剩余许可0
如果获取3个,则不够,此时最近一个(1900-1000,正无穷大],所以是1200,需要等待1200-(1900-1000)=300ms。获取失败,当前列表:1200 1300 1800 1850。剩余许可1

第三次,只等待了100ms,即当前是2000,获取可释放许可,2000-1000=1000,即(0,1000],没有可回收,当前列表:1200 1300 1800 1850。剩余许可1

第四次,又等待了200ms,即当前是2200,获取可释放许可,2200-1000=1200,即(0,1200],有1200,回收后列表变成:1300 1800 1850。剩余许可2
如果获取3个,则不够,此时最近一个(2200-1000,正无穷大],所以是1300,需要等待1300-(2200-1000)=100ms。获取失败,当前列表:1300 1800 1850。剩余许可2

第五次,又等待了100ms,即当前是2300,获取可释放许可,2300-1000=1300,即(0,1300],有1300,回收后列表变成:1800 1850。剩余许可3
如果获取3个,则足够。

3.4 带超时时间获取令牌

@Override
public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
    RPromise<Boolean> promise = new RedissonPromise<Boolean>();
    long timeoutInMillis = -1;
    if (timeout >= 0) {
        timeoutInMillis = unit.toMillis(timeout);
    }
    tryAcquireAsync(permits, promise, timeoutInMillis);
    return promise;
}

private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
    long s = System.currentTimeMillis();
    RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
    future.onComplete((delay, e) -> {
        if (e != null) {
            // 发生异常,获取令牌失败
            promise.tryFailure(e);
            return;
        }

        // delay是获取下一个令牌需要等待的时间。如果不需要等待,表示获取令牌成功
        if (delay == null) {
            promise.trySuccess(true);
            return;
        }

        // 获取令牌超时时间。如果设置为-1,则退化成tryAcquire(long permits)。即一直到获取成功后返回
        if (timeoutInMillis == -1) {
            // 延迟delay之后再获取令牌
            commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                tryAcquireAsync(permits, promise, timeoutInMillis);
            }, delay, TimeUnit.MILLISECONDS);
            return;
        }

        // 上一次获取令牌已消耗时间
        long el = System.currentTimeMillis() - s;
        // 剩余超时时间,如果<=0,表示超时时间已到,获取令牌失败
        long remains = timeoutInMillis - el;
        if (remains <= 0) {
            promise.trySuccess(false);
            return;
        }
        // 如果剩余超时时间<下一个令牌等待时间,即等不到获取下一个令牌已经超时了,则延迟remains之后,获取令牌失败
        if (remains < delay) {
            commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                promise.trySuccess(false);
            }, remains, TimeUnit.MILLISECONDS);
        } else {
            long start = System.currentTimeMillis();
            // 延迟delay之后,开始获取令牌
            commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                // 从创建线程到开始执行消耗时间
                long elapsed = System.currentTimeMillis() - start;
                if (remains <= elapsed) {
                    // 如果剩余超时时间<从创建线程到开始执行消耗时间,即线程开始时已经超时了,获取令牌失败
                    promise.trySuccess(false);
                    return;
                }

                // 重新计算剩余超时时间并获取令牌
                tryAcquireAsync(permits, promise, remains - elapsed);
            }, delay, TimeUnit.MILLISECONDS);
        }
    });
}

清除限流器

根据如上代码分析,限流器一旦创建并设置参数后,会在Redis中长期缓存几个key,分别是rate、interval、type。如果不处理,会一直存在。假如服务异常宕机,重启时,再次创建限流器可能会创建失败。遇到这种情况,可以先手动删除限流器。

// 删除限流器
rateLimiter.delete();

3.5 小结

  1. Redission分布式限流使用脚本巧妙的运用了Lua脚本,以及Redis中zset数据结构及操作方法。实现了获取令牌的逻辑。
  2. Redission实现的限流器,在当前db上只能创建一个。因为rate、interval、type都是全局的。如果需要,可以指定db。在其他db上创建别的限流器。因此最多可以创建16个分布式限流器。理论上可以把这几个key也添加分组器前缀,不知道后边版本会不会支持。或者也可以自己重写方法实现。
  3. 如果剩余令牌数不足,会返回下一个令牌需要等待多久。但是如果要一次获取多个令牌,可能还需要等待N轮才能成功。
  4. 如果在所有接口入口都添加获取令牌代码,侵入性太强。可以通过Spring AOP方式对controller接口拦截并限流。

http://www.kler.cn/a/563723.html

相关文章:

  • HTML第二节
  • centos设置 sh脚本开机自启动
  • 算法系列之回溯算法
  • Redis多线程模型演进
  • Dockerfile 中的 COPY 语句:作用与使用详解
  • 基于Django的手办交易平台~源码
  • 小波变换背景预测matlab和python, pytorch样例
  • Go红队开发—并发编程
  • liunx安装redis并配置主从
  • 【Java项目】基于Spring Boot的考研资讯平台
  • 若依 ruoyi-vue 根据角色切换路由菜单权限 SAAS
  • lowagie(itext)老版本手绘PDF,包含页码、水印、图片、复选框、复杂行列合并、行高设置等。
  • https:原理
  • DevOps全流程
  • 【红队利器】单文件一键结束火绒6.0
  • 【Python网络爬虫笔记】14-使用代理绕过访问限制
  • 生成GeoJson文件,Cesium@1.126.0中使用CZML,动态设置高度,动态设置颜色
  • 大语言模型(LLM)微调技术笔记
  • Android-创建mipmap-anydpi-v26的Logo
  • 【SpringMVC】十分钟跑起来一个SpringMVC项目