Redis进阶(二)--Redis高级特性和应用
文章目录
- 第二章、Redis高级特性和应用
- 一、Redis的慢查询
- 1、慢查询配置
- 2、慢查询操作命令
- 3、慢查询建议
- 二、Pipeline
- 三、事务
- 1、Redis的事务原理
- 2、Redis的watch命令
- 3、Pipeline和事务的区别
- 四、Lua
- 1、Lua入门
- (1)安装Lua
- (2)Lua基本语法
- 注释
- 标示符
- 关键词
- 全局变量
- (3)Lua中的数据类型
- (4)Lua 中的函数
- (5)Lua 变量
- (6)Lua中的控制语句
- 循环控制
- if条件控制
- (7)Lua 运算符
- (8)Lua其他特性
- (9)Java对Lua的支持
- Maven
- 2、Redis中的Lua
- (1)eval 命令
- 命令格式
- 命令说明
- 示例
- Lua 脚本中调用 Redis 命令
- (2)evalsha 命令
- (3)redis-cli 执行脚本
- 五、Redis与限流
- 1、使用Redis+Lua语言实现限流
- 2、限流算法
- (1)固定窗口算法
- (2)滑动窗口算法
- TCP中的滑动窗口
- (3)漏洞算法
- (4)令牌算法
- 六、发布和订阅
- 1、操作命令
- (1)发布消息
- (2)订阅消息
- (3)查询订阅情况
- 查看活跃的频道
- 查看频道订阅数
- (4)使用场景和缺点
- 七、Redis Stream
- 1、Stream总述
- 2、常用操作命令
- (1)生产端
- (2)消费端
- 单消费者
- 消费组
- 创建消费组
- 消息消费
- 3、在Redis中实现消息队列
- (1)基于pub/sub
- (2)基于Stream
- (3)Redis中几种消息队列实现的总结
- 基于List的 LPUSH+BRPOP 的实现
- 基于Sorted-Set的实现
- PUB/SUB,订阅/发布模式
- 基于Stream类型的实现
- 4、消息队列问题
- (1)Stream 消息太多怎么办?
- (2)消息如果忘记 ACK 会怎样?
- (3)PEL 如何避免消息丢失?
- (4)死信问题
- (5)Stream 的高可用
- (6)分区 Partition
- 5、Stream小结
- 八、Redis的Key和Value的数据结构组织
- 1、全局哈希表
- 2、渐进式rehash
第二章、Redis高级特性和应用
一、Redis的慢查询
许多存储系统(例如 MySQL)提供慢查询日志帮助开发和运维人员定位系统存在的慢操作。所谓慢查询日志就是系统在命令执行前后计算每条命令的执行时间,当超过预设阀值,就将这条命令的相关信息(例如:发生时间,耗时,命令的详细信息)记录下来,Redis也提供了类似的功能。
Redis客户端执行一条命令分为如下4个部分:
1、发送命令
2、命令排队
3、命令执行
4、返回结果
需要注意,慢查询只统计步骤3 的时间,所以没有慢查询并不代表客户端没有超时问题。因为有可能是命令的网络问题或者是命令在Redis 在排队,所以不是说命令执行很慢就说是慢查询,而有可能是网络的问题或者是Redis 服务非常繁忙(队列等待长)。
1、慢查询配置
对于任何慢查询功能,需要明确两件事:多慢算慢,也就是预设阀值怎么设置?慢查询记录存放在哪?
Redis提供了两种方式进行慢查询的配置
1、动态设置
慢查询的阈值默认值是10 毫秒
参数:slowlog-log-slower-than 就是时间预设阀值,它的单位是微秒(1秒=1000毫秒=1 000 000微秒),默认值是10 000,假如执行了一条“很慢”的命令(例如keys *),如果它的执行时间超过了10 000微秒,也就是10毫秒,那么它将被记录在慢查询日志中。
我们通过动态命令修改
config set slowlog-log-slower-than 20000
使用config set完后,若想将配置持久化保存到Redis.conf,要执行config rewrite
config rewrite
注意:
如果配置slowlog-log-slower-than=0 表示会记录所有的命令,slowlog-log-slower-than<0 对于任何命令都不会进行记录。
2、配置文件设置(修改后需重启服务才生效)
打开Redis的配置文件redis.conf,就可以看到以下配置:
slowlog-max-len用来设置慢查询日志最多存储多少条
另外Redis还提供了slowlog-max-len配置来解决存储空间的问题。
实际上Redis服务器将所有的慢查询日志保存在服务器状态的slowlog 链表中(内存列表),slowlog-max-len 就是列表的最大长度(默认128条)。当慢查询日志列表被填满后,新的慢查询命令则会继续入队,队列中的第一条数据会出列。
虽然慢查询日志是存放在Redis内存列表中的,但是Redis并没有告诉我们这里列表是什么,而是通过一组命令来实现对慢查询日志的访问和管理。并没有说明存放在哪。这个怎么办呢?Redis提供了一些列的慢查询操作命令让我们可以方便的操作。
2、慢查询操作命令
获取慢查询日志
slowlog get [n]
参数n可以指定查询条数。
可以看到每个慢查询日志有6 个属性组成,分别是慢查询日志的标识id、发生时间戳、命令耗时(单位微秒)、执行命令和参数,客户端IP+端口和客户端名称。
获取慢查询日志列表当前的长度
slowlog len
慢查询日志重置
slowlog reset
实际是对列表做清理操作
3、慢查询建议
慢查询功能可以有效地帮助我们找到Redis可能存在的瓶颈,但在实际使用过程中要注意以下几点:
slowlog-max-len配置建议:
建议调大慢查询列表,记录慢查询时Redis 会对长命令做截断操作,并不会占用大量内存。增大慢查询列表可以减缓慢查询被剔除的可能,线上生产建议设置为1000以上。
slowlog-log-slower-than配置建议:
配置建议:默认值超过10 毫秒判定为慢查询,需要根据Redis并发量调整该值。
由于Redis 采用单线程响应命令,对于高流量的场景,如果命令执行时间在1 毫秒以上,那么Redis 最多可支撑OPS 不到1000。因此对于高OPS 场景的Redis 建议设置为1 毫秒或者更低比如100 微秒。
慢查询只记录命令执行时间,并不包括命令排队和网络传输时间。因此客户端执行命令的时间会大于命令实际执行时间。因为命令执行排队机制,慢查询会导致其他命令级联阻塞,因此当客户端出现请求超时,需要检查该时间点是否有对应的慢查询,从而分析出是否为慢查询导致的命令级联阻塞。
由于慢查询日志是一个先进先出的队列,也就是说如果慢查询比较多的情况下,可能会丢失部分慢查询命令,为了防止这种情况发生,可以定期执行slow get命令将慢查询日志持久化到其他存储中。
二、Pipeline
前面我们已经说过,Redis客户端执行一条命令分为如下4个部分:1)发送命令2)命令排队3)命令执行4)返回结果。
其中1 和4 花费的时间称为Round Trip Time (RTT,往返时间),也就是数据在网络上传输的时间。
Redis提供了批量操作命令(例如mget、mset等),有效地节约RTT。
但大部分命令是不支持批量操作的,例如要执行n次 hgetall命令,并没有mhgetall命令存在,需要消耗n 次RTT。
举例:Redis的客户端和服务端可能部署在不同的机器上。例如客户端在本地,Redis服务器在阿里云的广州,两地直线距离约为800公里,那么1次RTT时间=800 x2/ ( 300000×2/3 ) =8毫秒,(光在真空中传输速度为每秒30万公里,这里假设光纤为光速的2/3 )。而Redis命令真正执行的时间通常在微秒(1000微妙=1毫秒)级别,所以才会有Redis 性能瓶颈是网络这样的说法。
Pipeline(流水线)机制能改善上面这类问题,它能将一组 Redis命令进行组装,通过一次RTT传输给Redis,再将这组Redis命令的执行结果按顺序返回给客户端。
没有使用Pipeline执行了n 条命令,整个过程需要n 次RTT。
使用Pipeline 执行了n次命令,整个过程需要1次RTT。
Pipeline并不是什么新的技术或机制,很多技术上都使用过。而且RTT在不同网络环境下会有不同,例如同机房和同机器会比较快,跨机房跨地区会比较慢。
redis-cli的–pipe选项实际上就是使用Pipeline机制,但绝对部分情况下,我们使用Java语言的Redis客户端中的Pipeline会更多一点。
代码参见:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import java.util.List;
@Component
public class RedisPipeline {
@Autowired
private JedisPool jedisPool;
public List<Object> plGet(List<String> keys) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
//pipe是将所有的命令组装成pipeline
Pipeline pipelined = jedis.pipelined();
for(String key:keys){
pipelined.get(key);//不是仅仅是get方法,set方法还要很多很多方法pipeline都提供了支持
}
return pipelined.syncAndReturnAll();//这里只会向redis发送一次
} catch (Exception e) {
throw new RuntimeException("执行Pipeline获取失败!",e);
} finally {
jedis.close();
}
}
public void plSet(List<String> keys,List<String> values) {
if(keys.size()!=values.size()) {
throw new RuntimeException("key和value个数不匹配!");
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Pipeline pipelined = jedis.pipelined();
for(int i=0;i<keys.size();i++){
pipelined.set(keys.get(i),values.get(i));
}
pipelined.sync();
} catch (Exception e) {
throw new RuntimeException("执行Pipeline设值失败!",e);
} finally {
jedis.close();
}
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* 操作字符串类型
*/
@Component
public class RedisString {
public final static String RS_STR_NS = "rs:";
@Autowired
private JedisPool jedisPool;
/**
* 向Redis中存值,永久有效
*/
public String set(String key, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
return jedis.set(RS_STR_NS+key,value);
} catch (Exception e) {
throw new RuntimeException("向Redis中存值失败!");
} finally {
jedis.close();
}
}
/**
* 批量向Redis中存值,永久有效
*/
public String msetRaw(String... keysvalues) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
/*自己去拆解,加前缀,再组合...*/
return jedis.mset(keysvalues);
} catch (Exception e) {
throw new RuntimeException("批量向Redis中存值失败!");
} finally {
jedis.close();
}
}
/**
* 根据传入Key获取指定Value
*/
public String get(String key) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
return jedis.get(RS_STR_NS+key);
} catch (Exception e) {
throw new RuntimeException("获取Redis值失败!");
} finally {
jedis.close();
}
}
}
@SpringBootTest
public class TestRedisPipeline {
@Autowired
private RedisPipeline redisPipeline;
@Autowired
private RedisString redisString;
private static final int TEST_COUNT = 10000;
@Test
public void testPipeline() {
long setStart = System.currentTimeMillis();
for (int i = 0; i < TEST_COUNT; i++) { //单个的操作
redisString.set("testStringM:key_" + i, String.valueOf(i));
}
long setEnd = System.currentTimeMillis();
System.out.println("非pipeline操作"+TEST_COUNT+"次字符串数据类型set写入,耗时:" + (setEnd - setStart) + "毫秒");
List<String> keys = new ArrayList<>(TEST_COUNT);
List<String> values= new ArrayList<>(TEST_COUNT);
for (int i = 0; i < keys.size(); i++) {
keys.add("testpipelineM:key_"+i);
values.add(String.valueOf(i));
}
long pipelineStart = System.currentTimeMillis();
redisPipeline.plSet(keys,values);
long pipelineEnd = System.currentTimeMillis();
System.out.println("pipeline操作"+TEST_COUNT+"次字符串数据类型set写入,耗时:" + (pipelineEnd - pipelineStart) + "毫秒");
}
}
总的来说,在不同网络环境下非Pipeline 和Pipeline 执行10000 次set 操作的效果,在执行时间上的比对如下:
差距有100多倍,可以得到如下两个结论:
1、Pipeline执行速度一般比逐条执行要快。
2、客户端和服务端的网络延时越大,Pipeline的效果越明显。
Pipeline虽然好用,但是每次Pipeline组装的命令个数不能没有节制,否则一次组装Pipeline 数据量过大,一方面会增加客户端的等待时间,另一方面会造成一定的网络阻塞,可以将一次包含大量命令的Pipeline 拆分成多次较小的Pipeline 来完成,比如可以将Pipeline 的总发送大小控制在内核输入输出缓冲区大小之内或者控制在单个TCP 报文最大值1460 字节之内。
内核的输入输出缓冲区大小一般是4K-8K,不同操作系统会不同(当然也可以配置修改)
最大传输单元(Maximum Transmission Unit,MTU),这个在以太网中最大值是1500字节。那为什么单个TCP 报文最大值是1460,因为因为还要扣减20 个字节的IP 头和20 个字节的TCP 头,所以是1460。
同时Pipeline 只能操作一个Redis 实例,但是即使在分布式Redis 场景中,也可以作为批量操作的重要优化手段。
三、事务
大家应该对事务比较了解,简单地说,事务表示一组动作,要么全部执行,要么全部不执行。
例如在社交网站上用户A 关注了用户B,那么需要在用户A 的关注表中加入用户B,并且在用户B 的粉丝表中添加用户A,这两个行为要么全部执行,要么全部不执行,否则会出现数据不一致的情况。
Redis提供了简单的事务功能,将一组需要一起执行的命令放到multi 和exec 两个命令之间。multi 命令代表事务开始,exec命令代表事务结束。另外discard 命令是回滚。
一个客户端
另外一个客户端
在事务没有提交的时查询(查不到数据)
在事务提交后查询(可以查到数据)
可以看到sadd 命令此时的返回结果是QUEUED,代表命令并没有真正执行,而是暂时保存在Redis 中的一个缓存队列(所以discard 也只是丢弃这个缓存队列中的未执行命令,并不会回滚已经操作过的数据,这一点要和关系型数据库的Rollback 操作区分开)。
只有当exec 执行后,用户A 关注用户B 的行为才算完成,如下所示exec 返回的两个结果对应sadd 命令。
但是要注意Redis 的事务功能很弱。在事务回滚机制上,Redis只能对基本的语法错误进行判断。
如果事务中的命令出现错误,Redis 的处理机制也不尽相同。
1、语法命令错误
例如下面操作错将set 写成了sett,属于语法错误,会造成整个事务无法执行,事务内的操作都没有执行:
2、运行时错误
例如:事务内第一个命令简单的设置一个string类型,第二个对这个key进行sadd命令,这种就是运行时命令错误,因为语法是正确的:
可以看到Redis并不支持回滚功能,第一个set命令已经执行成功,开发人员需要自己修复这类问题。
1、Redis的事务原理
事务是Redis 实现在服务器端的行为,用户执行MULTI 命令时,服务器会将对应这个用户的客户端对象设置为一个特殊的状态,在这个状态下后续用户执行的查询命令不会被真的执行,而是被服务器缓存起来,直到用户执行EXEC 命令为止,服务器会将这个用户对应的客户端对象中缓存的命令按照提交的顺序依次执行。
2、Redis的watch命令
有些应用场景需要在事务之前,确保事务中的key 没有被其他客户端修改过,才执行事务,否则不执行(类似乐观锁)。Redis 提供了watch命令来解决这类问题。
客户端1:
客户端2:
客户端1继续:
可以看到“客户端-1”在执行multi 之前执行了watch 命令,“客户端-2”在“客户端-1”执行exec 之前修改了key 值,造成客户端-1事务没有执行(exec结果为nil)。
Redis客户端中的事务使用代码参见:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import java.util.List;
@Component
public class RedisTransaction {
public final static String RS_TRANS_NS = "rts:";
@Autowired
private JedisPool jedisPool;
public List<Object> transaction(String... watchKeys){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
if(watchKeys.length>0){
/*使用watch功能*/
String watchResult = jedis.watch(watchKeys);
if(!"OK".equals(watchResult)) {
throw new RuntimeException("执行watch失败:"+watchResult);
}
}
Transaction multi = jedis.multi();
multi.set(RS_TRANS_NS+"test1","a1");
multi.set(RS_TRANS_NS+"test2","a2");
multi.set(RS_TRANS_NS+"test3","a3");
List<Object> execResult = multi.exec();
if(execResult==null){
throw new RuntimeException("事务无法执行,监视的key被修改:"+watchKeys);
}
System.out.println(execResult);
return execResult;
} catch (Exception e) {
throw new RuntimeException("执行Redis事务失败!",e);
} finally {
if(watchKeys.length>0){
jedis.unwatch();/*前面如果watch了,这里就要unwatch*/
}
jedis.close();
}
}
}
3、Pipeline和事务的区别
PipeLine看起来和事务很类似,感觉都是一批批处理,但两者还是有很大的区别。简单来说。
1、pipeline是客户端的行为,对于服务器来说是透明的,可以认为服务器无法区分客户端发送来的查询命令是以普通命令的形式还是以pipeline 的形式发送到服务器的;
2、而事务则是实现在服务器端的行为,用户执行MULTI命令时,服务器会将对应这个用户的客户端对象设置为一个特殊的状态,在这个状态下后续用户执行的查询命令不会被真的执行,而是被服务器缓存起来,直到用户执行EXEC命令为止,服务器会将这个用户对应的客户端对象中缓存的命令按照提交的顺序依次执行。
3、应用pipeline 可以提服务器的吞吐能力,并提高Redis处理查询请求的能力。
但是这里存在一个问题,当通过pipeline 提交的查询命令数据较少,可以被内核缓冲区所容纳时,Redis可以保证这些命令执行的原子性。然而一旦数据量过大,超过了内核缓冲区的接收大小,那么命令的执行将会被打断,原子性也就无法得到保证。因此pipeline 只是一种提升服务器吞吐能力的机制,如果想要命令以事务的方式原子性的被执行,还是需要事务机制,或者使用更高级的脚本功能以及模块功能。
4、可以将事务和pipeline 结合起来使用,减少事务的命令在网络上的传输时间,将多次网络IO 缩减为一次网络IO。
Redis提供了简单的事务,之所以说它简单,主要是因为它不支持事务中的回滚特性,同时无法实现命令之间的逻辑关系计算,当然也体现了Redis 的“keep it simple”的特性,下一小节介绍的Lua脚本同样可以实现事务的相关功能,但是功能要强大很多。
四、Lua
Lua语言是在1993年由巴西一个大学研究小组发明,其设计目标是作为嵌入式程序移植到其他应用程序,它是由C语言实现的,虽然简单小巧但是功能强大,所以许多应用都选用它作为脚本语言,尤其是在游戏领域,暴雪公司的“魔兽世界”,“愤怒的小鸟”,Nginx将Lua语言作为扩展。Redis将Lua作为脚本语言可帮助开发者定制自己的Redis命令。
Redis 2.6 版本通过内嵌支持 Lua 环境。也就是说一般的运用,是不需要单独安装Lua的。
通过使用LUA脚本:
1、减少网络开销,在Lua脚本中可以把多个命令放在同一个脚本中运行;
2、原子操作,redis会将整个脚本作为一个整体执行,中间不会被其他命令插入(Redis执行命令是单线程)。
3、复用性,客户端发送的脚本会永远存储在redis中,这意味着其他客户端可以复用这一脚本来完成同样的逻辑。
不过为了我们方便学习Lua语言,我们还是单独安装一个Lua。
1、Lua入门
(1)安装Lua
Lua在linux中的安装
到官网下载lua的tar.gz的源码包
1、wget
http://www.lua.org/ftp/lua-5.3.6.tar.gz
2、tar -zxvf
lua-5.3.6.tar.gz
进入解压的目录:
3、cd lua-5.3.6
4、make linux
5、make install(需要在root用户下)
如果报错,说找不到readline/readline.h,
可以root用户下通过yum命令安装
yum -y install
libtermcap-devel ncurses-devel libevent-devel readline-devel
安装完以后再make linux / make install
最后,直接输入 lua命令即可进入lua的控制台:
(2)Lua基本语法
Lua 学习起来非常简单,当然再简单,它也是个独立的语言,自成体系,不可能完全在本课中全部讲述完毕,如果工作中有深研Lua的需要,可以参考《Lua程序设计》,作者罗伯拖·鲁萨利姆斯奇 (Roberto Ierusalimschy)。
现在我们需要:print(“Hello World!”)
可以在命令行中输入程序并立即查看效果。
或者编写一个Lua脚本
然后执行
注释
单行注释
两个减号是单行注释: --
多行注释
--[[
注释内容
注释内容
--]]
标示符
Lua 标示符用于定义一个变量,函数获取其他用户定义的项。标示符以一个字母 A 到 Z 或 a 到 z 或下划线 _ 开头后加上 0 个或多个字母,下划线,数字(0 到 9)。
最好不要使用下划线加大写字母的标示符,因为Lua的语言内部的一些保留字也是这样的。
Lua 不允许使用特殊字符如 @, $, 和 % 来定义标示符。 Lua 是一个区分大小写的编程语言。因此在 Lua 中LIJIN与lijin 是两个不同的标示符。以下列出了一些正确的标示符:
关键词
以下列出了 Lua 的保留关键词。保留关键字不能作为常量或变量或其他用户自定义标示符:
同时一般约定,以下划线开头连接一串大写字母的名字(比如 _VERSION)被保留用于 Lua 内部全局变量。
全局变量
在默认情况下,变量总是认为是全局的。
全局变量不需要声明,给一个变量赋值后即创建了这个全局变量,访问一个没有初始化的全局变量也不会出错,只不过得到的结果是:nil。
如果你想删除一个全局变量,只需要将变量赋值为nil。这样变量b就好像从没被使用过一样。换句话说, 当且仅当一个变量不等于nil时,这个变量即存在。
(3)Lua中的数据类型
Lua 是动态类型语言,变量不要类型定义,只需要为变量赋值。
值可以存储在变量中,作为参数传递或结果返回。
Lua 中有 8 个基本类型分别为:nil、boolean、number、string、userdata、function、thread 和 table。
我们可以使用 type 函数测试给定变量或者值的类型。
我们只选择几个要点做说明:
1、nil 类型表示一种没有任何有效值,它只有一个值 – nil,对于全局变量和 table,nil 还有一个"删除"作用,给全局变量或者 table 表里的变量赋一个 nil 值,等同于把它们删掉,nil 作类型比较时应该加上双引号 "。
2、boolean 类型只有两个可选值:true(真) 和 false(假),Lua 把 false 和 nil 看作是 false,其他的都为 true,数字 0 也是 true。
3、Lua 默认只有一种 number类型 – double(双精度)类型。
print(type(2))
print(type(2.2))
print(type(0.2))
print(type(2e+1))
print(type(0.2e-1))
都被看作是 number 类型
4、字符串由一对双引号或单引号来表示,也可以用[[ 与 ]] 表示,一般来说,单行文本用双引号或单引号,多行文本用[[ 与 ]] 。
5、在对一个数字字符串上进行算术操作时,Lua 会尝试将这个数字字符串转成一个数字。
6、字符串连接使用的是 …
7、使用 # 来计算字符串的长度,放在字符串前面
8、table可以做为数组,也可以作为为Hash,table 不会固定长度大小,有新数据添加时 table 长度会自动增长,没初始的 table 都是 nil
不同于其他语言的数组把 0 作为数组的初始索引,可以看到在Lua里表的默认初始索引一般以 1 开始。
把table做hash表用:
(4)Lua 中的函数
在 Lua中,函数以function开头,以end结尾,funcName是函数名,中间部分是函数体:
function
funcName ()
--[[
函数内容
--]]
end
比如定义一个字符串连接函数:
function
contact(str1,str2)
return
str1..str2
end
print(contact("hello","Lijin"))
(5)Lua 变量
变量在使用前,需要在代码中进行声明,即创建该变量。
编译程序执行代码之前编译器需要知道如何给语句变量开辟存储区,用于存储变量的值。
Lua 变量有:全局变量、局部变量。
Lua 中的变量全是全局变量,那怕是语句块或是函数里,除非用 local 显式声明为局部变量。局部变量的作用域为从声明位置开始到所在语句块结束。
变量的默认值均为 nil。
(6)Lua中的控制语句
Lua中的控制语句和Java语言的差不多。
循环控制
Lua支持while 循环、for 循环、repeat…until循环和循环嵌套,同时,Lua提供了break 语句和goto 语句。
我们重点来看看while 循环、for 循环。
for 循环
Lua 编程语言中 for语句有两大类:数值for循环、泛型for循环。
数值for循环
Lua 编程语言中数值 for 循环语法格式:
for var=exp1,exp2,exp3 do
<执行体>
end
var 从 exp1 变化到 exp2,每次变化以 exp3 为步长递增 var,并执行一次 “执行体”。exp3 是可选的,如果不指定,默认为1。
泛型for循环
泛型 for 循环通过一个迭代器函数来遍历所有值,类似 java 中的 foreach 语句。Lua 编程语言中泛型 for 循环语法格式:
–打印数组a的所有值
a = {"one", "two", "three"}
for i, v in ipairs(a) do
print(i, v)
end
i是数组索引值,v是对应索引的数组元素值。ipairs是Lua提供的一个迭代器函数,用来迭代数组。
tbl3={age=18,name='lijin'}
for i, v in pairs(tbl3) do
print(i,v)
end
while循环
while(condition)
do
statements
end
a=10 while(a<20) do print("a= ",a) a=a+1 end
if条件控制
Lua支持if 语句、if…else 语句和if 嵌套语句。
if 语句语法格式如下:
if(布尔表达式)
then
--[ 在布尔表达式为 true 时执行的语句 --]
end
if...else 语句语法格式如下:
if(布尔表达式)
then
--[ 布尔表达式为 true 时执行该语句块 --]
else
--[ 布尔表达式为 false 时执行该语句块 --]
end
(7)Lua 运算符
Lua提供了以下几种运算符类型:
算术运算符
+ 加法
- 减法
* 乘法
/ 除法
% 取余
^ 乘幂
- 负号
关系运算符
== 等于
~= 不等于
> 大于
< 小于
>= 大于等于
<= 小于等于
逻辑运算符
and 逻辑与操作符
or 逻辑或操作符
not 逻辑非操作符
(8)Lua其他特性
Lua支持模块与包,也就是封装库,支持元表(Metatable),支持协程(coroutine),支持文件IO操作,支持错误处理,支持代码调试,支持Lua垃圾回收,支持面向对象和数据库访问,更多详情请参考对应书籍。
(9)Java对Lua的支持
目前Java生态中,对Lua的支持是LuaJ,是一个 Java 的 Lua 解释器,基于 Lua 5.2.x 版本。
Maven
<dependency>
<groupId>org.luaj</groupId>
<artifactId>luaj-jse</artifactId>
<version>3.0.1</version>
</dependency>
在工作中需要使用Lua语言或者Java中执行Lua脚本的,请自行仔细学习Lua语言本身和luaj-jse使用,一般这种形式用得非常少。
2、Redis中的Lua
(1)eval 命令
命令格式
EVAL script numkeys key [key ...] arg [arg ...]
命令说明
1、script 参数:
是一段 Lua 脚本程序,它会被运行在Redis 服务器上下文中,这段脚本不必(也不应该)定义为一个 Lua 函数。
2、numkeys 参数:
用于指定键名参数的个数。
3、key [key…] 参数:
从EVAL 的第三个参数开始算起,使用了 numkeys 个键(key),表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用1为基址的形式访问(KEYS[1],KEYS[2]···)。
4、arg [arg…]参数:
可以在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似(ARGV[1],ARGV[2]···)。
示例
eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
在这个范例中key [key …] 参数的作用不明显,其实它最大的作用是方便我们在Lua 脚本中调用 Redis 命令
Lua 脚本中调用 Redis 命令
这里我们主要记住 call() 命令即可:
eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 key1 key2 first second
(2)evalsha 命令
但是eval命令要求你在每次执行脚本的时候都发送一次脚本,所以Redis 有一个内部的缓存机制,因此它不会每次都重新编译脚本,不过在很多场合,付出无谓的带宽来传送脚本主体并不是最佳选择。
为了减少带宽的消耗, Redis 提供了evalsha 命令,它的作用和 EVAL一样,都用于对脚本求值,但它接受的第一个参数不是脚本,而是脚本的 SHA1 摘要。
这里就需要借助script命令。
script flush :清除所有脚本缓存。
script exists :根据给定的脚本校验,检查指定的脚本是否存在于脚本缓存。
script load :将一个脚本装入脚本缓存,返回SHA1摘要,但并不立即运行它。
script kill :杀死当前正在运行的脚本。
这里的 SCRIPT LOAD 命令就可以用来生成脚本的 SHA1 摘要
script load "return redis.call('set',KEYS[1],ARGV[1])"
然后就可以执行这个脚本
evalsha "c686f316aaf1eb01d5a4de1b0b63cd233010e63d" 1 key1 testscript
(3)redis-cli 执行脚本
可以使用 redis-cli 命令直接执行脚本,这里我们直接新建一个 lua 脚本文件,用来获取刚刚存入 Redis 的
key1的值,vim redis.lua,然后编写 Lua 命令:
local value = redis.call('get','key1')
return value
然后执行
./redis-cli -p 6379 --eval ../scripts/test.lua
也可以
./redis-cli -p 6379 script load "$(cat ../scripts/test.lua)"
五、Redis与限流
1、使用Redis+Lua语言实现限流
项目代码
@RestController
public class Controller {
@Autowired
IsAcquire isAcquire;//手下的分布式限流
//秒杀接口
@RequestMapping("/order")
public String killProduct(@RequestParam(required = true) String name) throws Exception{
//rateLimiter.tryAcquire(1); //调用
if(isAcquire.acquire("iphone",10,60)){//60秒只能进行10次
System.out.println("业务成功!");
return "恭喜("+name+"),抢到iphone!";
}else{
System.out.println("-----------业务被限流");
return "对不起,你被限流了!";
}
}
}
/**
* 分布式限流的服务类
*/
@Service
public class IsAcquire {
//引入一个Redis的Lua脚本的支持
private DefaultRedisScript<Long> getRedisScript;
//判断限流方法---类似于RateLimiter
public boolean acquire(String limitKey,int limit,int expire) throws Exception{
//连接Redis
Jedis jedis = new Jedis("127.0.0.1",6379);
getRedisScript =new DefaultRedisScript<>();
getRedisScript.setResultType(Long.class);//脚本执行返回值 long
getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimiter.lua")));
Long result = (Long)jedis.eval(getRedisScript.getScriptAsString(),
1,limitKey,String.valueOf(limit),String.valueOf(expire));
if(result ==0){
return false;
}
return true;
}
}
rateLimiter.lua
--java端送入三个参数(1个key,2个param )string
--limitKey(redi中key的值)
local key =KEYS[1];
--limit(次数)
local times = ARGV[1];
--expire(秒S)
local expire = ARGV[2];
--对key-value中的 value +1的操作 返回一个结果
local afterval= redis.call('incr',key);
if afterval ==1 then --第一次
redis.call('expire',key,tonumber(expire) ) --失效时间(1S) TLL 1S
return 1; --第一次不会进行限制
end
--不是第一次,进行判断
if afterval > tonumber(times) then
--限制了
return 0;
end
return 1;
方案好处:
支持分布式
使用lua脚本的好处:
减少网络开销
原子操作
复用
2、限流算法
(1)固定窗口算法
简单粗暴,但是有临界问题
(2)滑动窗口算法
在线演示滑动窗口:
https://media.pearsoncmg.com/aw/ecs_kurose_compnetwork_7/cw/content/interactiveanimations/selective-repeat-protocol/index.html
滑动窗口通俗来讲就是一种流量控制技术。
它本质上是描述接收方的TCP数据报缓冲区大小的数据,发送方根据这个数据来计算自己最多能发送多长的数据,如果发送方收到接收方的窗口大小为0的TCP数据报,那么发送方将停止发送数据,等到接收方发送窗口大小不为0的数据报的到来。
首先是第一次发送数据这个时候的窗口大小是根据链路带宽的大小来决定的。我们假设这个时候窗口的大小是3。这个时候接受方收到数据以后会对数据进行确认告诉发送方我下次希望手到的是数据是多少。这里我们看到接收方发送的ACK=3(这是发送方发送序列2的回答确认,下一次接收方期望接收到的是3序列信号)。这个时候发送方收到这个数据以后就知道我第一次发送的3个数据对方只收到了2个。就知道第3个数据对方没有收到。下次在发送的时候就从第3个数据开始发。
此时窗口大小变成了2 。
于是发送方发送2个数据。看到接收方发送的ACK是5就表示他下一次希望收到的数据是5,发送方就知道我刚才发送的2个数据对方收了这个时候开始发送第5个数据。
这就是滑动窗口的工作机制,当链路变好了或者变差了这个窗口还会发生变话,并不是第一次协商好了以后就永远不变了。
所以滑动窗口协议,是TCP使用的一种流量控制方法。该协议允许发送方在停止并等待确认前可以连续发送多个分组。由于发送方不必每发一个分组就停下来等待确认,因此该协议可以加速数据的传输。
只有在接收窗口向前滑动时(与此同时也发送了确认),发送窗口才有可能向前滑动。
收发两端的窗口按照以上规律不断地向前滑动,因此这种协议又称为滑动窗口协议。
TCP中的滑动窗口
发送方和接收方都会维护一个数据帧的序列,这个序列被称作窗口。发送方的窗口大小由接收方确认,目的是控制发送速度,以免接收方的缓存不够大导致溢出,同时控制流量也可以避免网络拥塞。
在TCP 的可靠性的图中,我们可以看到,发送方每发送一个数据接收方就要给发送方一个ACK对这个数据进行确认。只有接收了这个确认数据以后发送方才能传输下个数据。
存在的问题:如果窗口过小,当传输比较大的数据的时候需要不停的对数据进行确认,这个时候就会造成很大的延迟。
如果窗口过大,我们假设发送方一次发送100个数据,但接收方只能处理50个数据,这样每次都只对这50个数据进行确认。发送方下一次还是发送100个数据,但接受方还是只能处理50个数据。这样就避免了不必要的数据来拥塞我们的链路。
因此,我们引入了滑动窗口。
(3)漏洞算法
定义
先有一个桶,桶的容量是固定的。
以任意速率向桶流入水滴,如果桶满了则溢出(被丢弃)。
桶底下有个洞,按照固定的速率从桶中流出水滴。
特点
漏桶核心是:请求来了以后,直接进桶,然后桶根据自己的漏洞大小慢慢往外面漏。
具体实现的时候要考虑性能(比如Redis实现的时候数据结构的操作是不是会导致性能问题)
(4)令牌算法
定义
先有一个桶,容量是固定的,是用来放令牌的。
以固定速率向桶放令牌,如果桶满了就不放令牌了。
处理请求是先从桶拿令牌,先拿到令牌再处理请求,拿不到令牌同样也被限流了。
特点
突发情况下可以一次拿多个令牌进行处理。
具体实现的时候要考虑性能(比如Redis实现的时候数据结构的操作是不是会导致性能问题)
六、发布和订阅
Redis提供了基于“发布/订阅”模式的消息机制,此种模式下,消息发布者和订阅者不进行直接通信,发布者客户端向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以收到该消息。
1、操作命令
Redis主要提供了发布消息、订阅频道、取消订阅以及按照模式订阅和取消订阅等命令。
(1)发布消息
publish channel message
返回值是接收到信息的订阅者数量,如果是0 说明没有订阅者,这条消息就丢了(再启动订阅者也不会收到)。
(2)订阅消息
subscribe channel [channel ...]
订阅者可以订阅一个或多个频道,如果此时另一个客户端发布一条消息,当前订阅者客户端会收到消息。
如果有多个客户端同时订阅了同一个频道,都会收到消息。
客户端在执行订阅命令之后进入了订阅状态(类似于监听),只能接收subscribe、psubscribe、unsubscribe、 punsubscribe的四个命令。
(3)查询订阅情况
查看活跃的频道
pubsub channels [pattern]
Pubsub 命令用于查看订阅与发布系统状态,包括活跃的频道(是指当前频道至少有一个订阅者),其中[pattern]是可以指定具体的模式,类似于通配符。
查看频道订阅数
pubsub numsub channel
最后也可以通过 help看具体的参数运用
(4)使用场景和缺点
需要消息解耦又并不关注消息可靠性的地方都可以使用发布订阅模式。
PubSub 的生产者传递过来一个消息,Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
所以和很多专业的消息队列系统(例如Kafka、RocketMQ)相比,Redis 的发布订阅很粗糙,例如无法实现消息堆积和回溯。但胜在足够简单,如果当前场景可以容忍的这些缺点,也不失为一个不错的选择。
正是因为 PubSub 有这些缺点,它的应用场景其实是非常狭窄的。从Redis5.0 新增了 Stream 数据结构,这个功能给 Redis 带来了持久化消息队列。
七、Redis Stream
Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,Redis的作者声明Redis Stream借鉴了 Kafka 的设计。
1、Stream总述
Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。
具体的玩法如下:
1、每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建。
消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成(*代表默认自动),也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。
2、每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。
每个消费组都有一个Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id变量。
3、每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。
4、同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。
5、消费者 (Consumer) 内部会有个状态变量pending_ids,它记录了当前已经被客户端读取,但是还没有 ack的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
2、常用操作命令
(1)生产端
xadd 追加消息
xadd第一次对于一个stream使用可以生成一个stream的结构
xadd streamtest * name lijin age 18
*号表示服务器自动生成 ID,后面顺序跟着一堆 key/value
1626705954593-0 则是生成的消息 ID,由两部分组成:时间戳-序号。时间戳时毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型。序号是在这个毫秒时间点内的消息序号。它也是个64位整型。
为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。
强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但ID是支持自定义的。
xrange 获取消息列表,会自动过滤已经删除的消息
xrange streamtest - +
其中-表示最小值 , + 表示最大值
或者我们可以指定消息 ID 的列表:
xrange streamtest - 1665646270814-0
xlen 消息长度
xlen streamtest
del 删除 Stream
del streamtest 删除整个 Stream
xdel可以删除指定的消息(指定ID)
(2)消费端
单消费者
虽然Stream中有消费者组的概念,但是可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。
xread count 1 streams stream2 0-0
表示从 Stream 头部读取1条消息,0-0指从头开始
xread count 2 streams stream1 1665644057564-0
也可以指定从streams的消息Id开始(不包括命令中的消息id)
xread count 1 streams stream1 $
$代表从尾部读取,上面的意思就是从尾部读取最新的一条消息,此时默认不返回任何消息
应该以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来
xread block 0 count 1 streams stream1 $
block后面的数字代表阻塞时间,单位毫秒,0代表一直阻塞
此时我们新开一个客户端,往stream1中写入一条消息
可以看到看到阻塞解除了,返回了新的消息内容,而且还显示了一个等待时间,这里我们等待了10.82s
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。不然很容易重复消息,基于这点单消费者基本上没啥运用场景。
消费组
创建消费组
Stream 通过xgroup create指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化last_delivered_id变量。
0-表示从头开始消费
xgroup create stream1 c1 0-0
$ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
xgroup create stream1 c2 $
现在我们可以用xinfo命令来看看stream1的情况:
xinfo stream stream1
查看stream1的消费组的情况:
xinfo groups stream1
消息消费
有了消费组,自然还需要消费者,Stream提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。
它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。
xreadgroup GROUP c1 consumer1 count 1 streams stream1 >
consumer1代表消费者的名字。
">"表示从当前消费组的 last_delivered_id 后面开始读,每当消费者读取一条消息,last_delivered_id 变量就会前进。前面我们定义cg1的时候是从头开始消费的,自然就获得stream1中第一条消息再执行一次上面的命令,自然就读取到了下条消息。我们将Stream1中的消息读取完,很自然就没有消息可读了。
然后设置阻塞等待
我们新开一个客户端,发送消息到stream1回到原来的客户端,发现阻塞解除,收到新消息
我们来观察一下观察消费组状态
如果同一个消费组有多个消费者,我们还可以通过 xinfo consumers 指令观察每个消费者的状态
xinfo consumers stream2 c1
可以看到目前c1这个消费者有 7 条待ACK的消息,空闲了2086176ms 没有读取消息。
如果我们确认一条消息
xack stream1 c1 1665647371850-0
就可以看到待确认消息变成了6条
xack允许带多个消息id,比如 同时Stream还提供了命令XPENDING 用来获消费组或消费内消费者的未处理完毕的消息。
xpending stream1 c1
具体操作细节可以参考:xpending 命令 – Redis中国用户组(CRUG)
命令XCLAIM[kleɪm]用以进行消息转移的操作,将某个消息转移到自己的Pending[ˈpendɪŋ]列表中。需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。
[]
具体操作细节可参考:xclaim 命令 – Redis中国用户组(CRUG)
3、在Redis中实现消息队列
(1)基于pub/sub
注意必须继承JedisPubSub这个抽象类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
/*
* 基于PUBSUB的消息中间件的实现
* */
@Component
public class PSVer extends JedisPubSub {
public final static String RS_PS_MQ_NS = "rpsm:";
@Autowired
private JedisPool jedisPool;
@Override
public void onMessage(String channel, String message) {
System.out.println("Accept "+channel+" message:"+message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Subscribe "+channel+" count:"+subscribedChannels);
}
public void pub(String channel, String message) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.publish(RS_PS_MQ_NS+channel,message);
System.out.println("发布消息到"+RS_PS_MQ_NS+channel+" message="+message);
} catch (Exception e) {
throw new RuntimeException("发布消息失败!");
} finally {
jedis.close();
}
}
public void sub(String... channels) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(this,channels);
} catch (Exception e) {
throw new RuntimeException("订阅频道失败!");
} finally {
jedis.close();
}
}
}
@SpringBootTest
public class TestPSVer {
@Autowired
private PSVer psVer;
@Test
void testSub(){
psVer.sub(PSVer.RS_PS_MQ_NS+"psmq", PSVer.RS_PS_MQ_NS+"psmq2");
}
@Test
void testPub(){
psVer.pub("psmq","msgtest");
psVer.pub("psmq2","msgtest2");
}
}
(2)基于Stream
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.params.XReadGroupParams;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 实现消费组消费,不考虑单消费者模式
*/
@Component
public class StreamVer {
public final static String RS_STREAM_MQ_NS = "rsm:";
@Autowired
private JedisPool jedisPool;
/**
* 发布消息到Stream
* @param key
* @param message
* @return
*/
public StreamEntryID produce(String key,Map<String,String> message){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
StreamEntryID id = jedis.xadd(RS_STREAM_MQ_NS+key, StreamEntryID.NEW_ENTRY, message);
System.out.println("发布消息到"+RS_STREAM_MQ_NS+key+" 返回消息id="+id.toString());
return id;
} catch (Exception e) {
throw new RuntimeException("发布消息失败!");
} finally {
jedis.close();
}
}
/**
* 创建消费群组,消费群组不可重复创建
* @param key
* @param groupName
* @param lastDeliveredId
*/
public void createCustomGroup(String key, String groupName, String lastDeliveredId){
Jedis jedis = null;
try {
StreamEntryID id = null;
if (lastDeliveredId==null){
lastDeliveredId = "0-0";
}
id = new StreamEntryID(lastDeliveredId);
jedis = jedisPool.getResource();
/*makeStream表示没有时是否自动创建stream,但是如果有,再自动创建会异常*/
jedis.xgroupCreate(RS_STREAM_MQ_NS+key,groupName,id,false);
System.out.println("创建消费群组成功:"+groupName);
} catch (Exception e) {
throw new RuntimeException("创建消费群组失败!",e);
} finally {
jedis.close();
}
}
/**
* 消息消费
* @param key
* @param customerName
* @param groupName
* @return
*/
public List<Map.Entry<String, List<StreamEntry>>> consume(String key, String customerName,String groupName){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
/*消息消费时的参数*/
XReadGroupParams xReadGroupParams = new XReadGroupParams().block(0).count(1);
Map<String, StreamEntryID> streams = new HashMap<>();
streams.put(RS_STREAM_MQ_NS+key,StreamEntryID.UNRECEIVED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> result
= jedis.xreadGroup(groupName, customerName, xReadGroupParams, streams);
System.out.println(groupName+"从"+RS_STREAM_MQ_NS+key+"接受消息, 返回消息:"+result);
return result;
} catch (Exception e) {
throw new RuntimeException("消息消费失败!",e);
} finally {
jedis.close();
}
}
/**
* 消息确认
* @param key
* @param groupName
* @param msgId
*/
public void ackMsg(String key, String groupName,StreamEntryID msgId){
if (msgId==null) throw new RuntimeException("msgId为空!");
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
System.out.println(jedis.xack(key,groupName,msgId));
System.out.println(RS_STREAM_MQ_NS+key+",消费群组"+groupName+" 消息已确认");
} catch (Exception e) {
throw new RuntimeException("消息确认失败!",e);
} finally {
jedis.close();
}
}
/*
检查消费者群组是否存在,辅助方法
* */
public boolean checkGroup(String key, String groupName){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
for(StreamGroupInfo groupinfo : xinfoGroupResult) {
if(groupName.equals(groupinfo.getName())) return true;
}
return false;
} catch (Exception e) {
throw new RuntimeException("检查消费群组失败!",e);
} finally {
jedis.close();
}
}
public final static int MQ_INFO_CONSUMER = 1;
public final static int MQ_INFO_GROUP = 2;
public final static int MQ_INFO_STREAM = 0;
/**
* 消息队列信息查看
* @param type
*/
public void MqInfo(int type,String key, String groupName){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
if(type==MQ_INFO_CONSUMER){
List<StreamConsumersInfo> xinfoConsumersResult = jedis.xinfoConsumers(RS_STREAM_MQ_NS+key, groupName);
System.out.println(RS_STREAM_MQ_NS+key+" 消费者信息:" + xinfoConsumersResult);
for( StreamConsumersInfo consumersinfo : xinfoConsumersResult) {
System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());
System.out.println("--Name:" + consumersinfo.getName());
System.out.println("--Pending:" + consumersinfo.getPending());
System.out.println("--Idle:" + consumersinfo.getIdle());
}
}else if (type==MQ_INFO_GROUP){
List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
System.out.println(RS_STREAM_MQ_NS+key+"消费者群组信息:" + xinfoGroupResult);
for(StreamGroupInfo groupinfo : xinfoGroupResult) {
System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());
System.out.println("--Name:" + groupinfo.getName());
System.out.println("--Consumers:" + groupinfo.getConsumers());
System.out.println("--Pending:" + groupinfo.getPending());
System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());
}
}else{
StreamInfo xinfoStreamResult = jedis.xinfoStream(RS_STREAM_MQ_NS+key);
System.out.println(RS_STREAM_MQ_NS+key+"队列信息:" + xinfoStreamResult);
System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());
System.out.println("--Length:" + xinfoStreamResult.getLength());
System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());
System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());
System.out.println("--Groups:" + xinfoStreamResult.getGroups());
System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());
System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());
System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());
}
} catch (Exception e) {
throw new RuntimeException("消息队列信息检索失败!",e);
} finally {
jedis.close();
}
}
}
@SpringBootTest
public class TestStreamVer {
@Autowired
private StreamVer streamVer;
private final static String KEY_NAME = "testStream";
private final static String GROUP_NAME = "testgroup";
@Test
void testProduce(){
Map<String,String> message = new HashMap<>();
message.put("name","lijin");
message.put("age","18");
streamVer.produce(KEY_NAME,new HashMap<>(message));//生产
streamVer.MqInfo(StreamVer.MQ_INFO_STREAM,KEY_NAME,null); //查看
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,null);
}
@Test
void testConsumer(){
if (!streamVer.checkGroup(KEY_NAME,GROUP_NAME)){
streamVer.createCustomGroup(KEY_NAME,GROUP_NAME,null);
}
List<Map.Entry<String, List<StreamEntry>>> results = streamVer.consume(KEY_NAME,"testUser",GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_CONSUMER,KEY_NAME,GROUP_NAME);
for(Map.Entry<String, List<StreamEntry>> result:results ){
for(StreamEntry entry:result.getValue()){
streamVer.ackMsg(KEY_NAME,GROUP_NAME,entry.getID());
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_CONSUMER,KEY_NAME,GROUP_NAME);
}
}
}
@Test
void testAck(){
streamVer.ackMsg(KEY_NAME,GROUP_NAME,null);
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_CONSUMER,KEY_NAME,GROUP_NAME);
}
}
java封装了两个类用于处理消息及消息的元数据。
StreamEntry和StreamEntryID
(3)Redis中几种消息队列实现的总结
基于List的 LPUSH+BRPOP 的实现
足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题。
如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。
其他缺点包括:
做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认;不能做广播模式,如pub/sub,消息发布/订阅模型;不能重复消费,一旦消费就会被删除;不支持分组消费。
基于Sorted-Set的实现
多用来实现延迟队列,当然也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息。
PUB/SUB,订阅/发布模式
优点:
典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。
缺点:
消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回;不能保证每个消费者接收的时间是一致的;若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时;可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。
基于Stream类型的实现
基本上已经有了一个消息中间件的雏形,可以考虑在生产过程中使用。
4、消息队列问题
从我们上面对Stream的使用表明,Stream已经具备了一个消息队列的基本要素,生产者API、消费者API,消息Broker,消息的确认机制等等,所以在使用消息中间件中产生的问题,这里一样也会遇到。
(1)Stream 消息太多怎么办?
要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉?xdel 指令又不会删除消息,它只是给消息做了个标志位。
Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度。
(2)消息如果忘记 ACK 会怎样?
Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。所以消息要尽可能的快速消费并确认。
(3)PEL 如何避免消息丢失?
在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 ID 不能为参数,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自last_delivered_id之后的新消息。
(4)死信问题
如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter(通过XPENDING可以查询到)就会累加,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,注意,这个命令并没有删除Pending中的消息,因此查看Pending,消息还会在,可以在执行执行XDEL之后,XACK这个消息标识其处理完毕。
(5)Stream 的高可用
Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的,在 failover 发生时,Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。
(6)分区 Partition
Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。
5、Stream小结
Stream 的消费模型借鉴了Kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。
关于 Redis 是否适合做消息队列,业界一直是有争论的。很多人认为,要使用消息队列,就应该采用 Kafka、RabbitMQ 这些专门面向消息队列场景的软件,而 Redis 更加适合做缓存。
Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。
所以,关于是否用 Redis 做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。
八、Redis的Key和Value的数据结构组织
1、全局哈希表
为了实现从键到值的快速访问,Redis 使用了一个哈希表来保存所有键值对。一个哈希表,其实就是一个数组,数组的每个元素称为一个哈希桶。所以,我们常说,一个哈希表是由多个哈希桶组成的,每个哈希桶中保存了键值对数据。
哈希桶中的 entry 元素中保存了key和value指针,分别指向了实际的键和值,这样一来,即使值是一个集合,也可以通过*value指针被查找到。因为这个哈希表保存了所有的键值对,所以,把它称为全局哈希表。
哈希表的最大好处很明显,就是让我们可以用 O(1) 的时间复杂度来快速查找到键值对:我们只需要计算键的哈希值,就可以知道它所对应的哈希桶位置,然后就可以访问相应的 entry 元素。
但当你往 Redis 中写入大量数据后,就可能发现操作有时候会突然变慢了。这其实是因为你忽略了一个潜在的风险点,那就是哈希表的冲突问题和 rehash 可能带来的操作阻塞。
当你往哈希表中写入更多数据时,哈希冲突是不可避免的问题。这里的哈希冲突,两个 key 的哈希值和哈希桶计算对应关系时,正好落在了同一个哈希桶中。
Redis 解决哈希冲突的方式,就是链式哈希。链式哈希也很容易理解,就是指同一个哈希桶中的多个元素用一个链表来保存,它们之间依次用指针连接。
当然如果这个数组一直不变,那么hash冲突会变很多,这个时候检索效率会大打折扣,所以Redis就需要把数组进行扩容(一般是扩大到原来的两倍),但是问题来了,扩容后每个hash桶的数据会分散到不同的位置,这里设计到元素的移动,必定会阻塞IO,所以这个ReHash过程会导致很多请求阻塞。
2、渐进式rehash
为了避免这个问题,Redis 采用了渐进式 rehash。
首先、Redis 默认使用了两个全局哈希表:哈希表 1 和哈希表 2。一开始,当你刚插入数据时,默认使用哈希表 1,此时的哈希表 2 并没有被分配空间。随着数据逐步增多,Redis 开始执行 rehash。
1、给哈希表 2 分配更大的空间,例如是当前哈希表 1 大小的两倍
2、把哈希表 1 中的数据重新映射并拷贝到哈希表 2 中
3、释放哈希表 1 的空间
在上面的第二步涉及大量的数据拷贝,如果一次性把哈希表 1 中的数据都迁移完,会造成 Redis 线程阻塞,无法服务其他请求。此时,Redis 就无法快速访问数据了。
在Redis 开始执行 rehash,Redis仍然正常处理客户端请求,但是要加入一个额外的处理:
处理第1个请求时,把哈希表 1中的第1个索引位置上的所有 entries 拷贝到哈希表 2 中
处理第2个请求时,把哈希表 1中的第2个索引位置上的所有 entries 拷贝到哈希表 2 中
如此循环,直到把所有的索引位置的数据都拷贝到哈希表 2 中。
这样就巧妙地把一次性大量拷贝的开销,分摊到了多次处理请求的过程中,避免了耗时操作,保证了数据的快速访问。
所以这里基本上也可以确保根据key找value的操作在O(1)左右。
不过这里要注意,如果Redis中有海量的key值的话,这个Rehash过程会很长很长,虽然采用渐进式Rehash,但在Rehash的过程中还是会导致请求有不小的卡顿。并且像一些统计命令也会非常卡顿:比如keys
按照Redis的配置每个实例能存储的最大的key的数量为2的32次方,即2.5亿,但是尽量把key的数量控制在千万以下,这样就可以避免Rehash导致的卡顿问题,如果数量确实比较多,建议采用分区hash存储。