Redis自学之路—高级特性(实现消息队列)(七)
目录
简介
Redis的Key和Value的数据结构组织
全局哈希表
渐进式rehash
发布和订阅
操作命令
publish 发布消息
subscribe 订阅消息
psubscribe订阅频道
unsubscribe 取消订阅一个或多个频道
punsubscribe 取消订阅一个或多个模式
查询订阅情况-查看活跃的频道
查询订阅情况-查看频道订阅数
使用场景
缺点
Redis Stream
Stream总述
常用操作命令
生产端
xadd 追加消息
xrange 获取消息列表,会自动过滤已经删除的消息
xlen 消息长度
del 删除Stream
xdel 删除指定的消息(指定ID)
消费者
单消费者
xread从Streams开头进行消费
指定从streams的消息ID开始(不包括命令中的消息id)
从尾部读取最新的一条消息
以阻塞的方式读取最新一条消息
消费组
xgroup 创建消费组 0-表示从头开始消费
xgroup 创建消费组 $表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
xinfo查看指定Stream的详细信息
xinfo 查看消费组Group的情况
xinfo 查看Consumer信息
消息消费
xreadgroup消费组的组内消费
xreadgroup 设置阻塞等待进行消费组内消费者消费消息
xack 消息确认
消息不消息的原因
xpending 获取消费组内消费者的未处理完毕的消息
xpending 查询详细信息
xclaim用于转移未确认(pending)消息所有权
查看已消费消息的方法
Redis中几种消息队列实现总结
1.基于List的 LPUSH+BRPOP 的实现
优点
缺点
2.基于Sorted-Set的实现
优点
缺点
3.PUB/SUB,订阅/发布模式
优点
缺点
基于Stream类型的实现
消息队列问题
Stream消息太多怎么办?
消息如果忘记ACK会怎样?
PEL如何避免消息丢失?
死信问题
Stream的高可用
分区 Partition
简介
本文章主要了解Redis的高级特性与应用场景,很多朋友们都认为Redis只是单纯的负责项目中的缓存,以及在项目中做登录功能,数据缓存功能,其实都被误导了,Redis也是支持消息队列的,支持发布和订阅、支持生产端、消费端。目前第三方消息队列所能处理的事,Redis也是可以的,只不过Redis有它自己的优点和弊端,接下来主要来了解一下Redis的消息队列。
Redis的Key和Value的数据结构组织
全局哈希表
为了实现从键到值的快速访问,Redis 使用了一个哈希表来保存所有键值对。一个哈希表,其实就是一个数组,数组的每个元素称为一个哈希桶。一个哈希表是由多个哈希桶组成的,每个哈希桶中保存了键值对数据。
哈希桶中的 entry 元素中保存了key和value指针,分别指向了实际的键和值,这样一来,即使值是一个集合,也可以通过*value指针被查找到。因为这个哈希表保存了所有的键值对,所以,把它称为全局哈希表。
哈希表的最大好处很明显,就是可以用 O(1) 的时间复杂度来快速查找到键值对:只需要计算键的哈希值,就可以知道它所对应的哈希桶位置,然后就可以访问相应的 entry 元素。
但当往 Redis 中写入大量数据后,就可能发现操作有时候会突然变慢了。这其实是因为忽略了一个潜在 的风险点,那就是哈希表的冲突问题和 rehash 可能带来的操作阻塞。
当往哈希表中写入更多数据时,哈希冲突是不可避免的问题。这里的哈希冲突,两个 key 的哈希值和哈希桶计算对应关系时,正好落在了同一个哈希桶中。
Redis 解决哈希冲突的方式,就是链式哈希。链式哈希也很容易理解,就是指同一个哈希桶中的多个元素用一个链表来保存,它们之间依次用指针连接。
当然如果这个数组一直不变,那么hash冲突会变很多,这个时候检索效率会大打折扣,所以Redis就需要把数组进行扩容(一般是扩大到原来的两倍),但是问题来了,扩容后每个hash桶的数据会分散到不同的位置,这里设计到元素的移动,必定会阻塞IO,所以这个ReHash过程会导致很多请求阻塞。
渐进式rehash
为了避免这个问题,Redis 采用了渐进式 rehash。
首先、Redis 默认使用了两个全局哈希表:哈希表 1 和哈希表 2。一开始,当你刚插入数据时,默认使用哈希表 1,此时的哈希表 2 并没有被分配空间。随着数据逐步增多,Redis 开始执行 rehash。
1、给哈希表 2 分配更大的空间,例如是当前哈希表 1 大小的两倍
2、把哈希表 1 中的数据重新映射并拷贝到哈希表 2 中
3、释放哈希表 1 的空间
在上面的第二步涉及大量的数据拷贝,如果一次性把哈希表 1 中的数据都迁移完,会造成 Redis 线程阻塞,无法服务其他请求。此时,Redis 就无法快速访问数据了。
在Redis 开始执行 rehash,Redis仍然正常处理客户端请求,但是要加入一个额外的处理:
- 处理第1个请求时,把哈希表 1中的第1个索引位置上的所有 entries 拷贝到哈希表 2 中
- 处理第2个请求时,把哈希表 1中的第2个索引位置上的所有 entries 拷贝到哈希表 2 中
- 如此循环,直到把所有的索引位置的数据都拷贝到哈希表 2 中。
这样就巧妙地把一次性大量拷贝的开销,分摊到了多次处理请求的过程中,避免了耗时操作,保证了数据的快速访问。
所以这里基本上也可以确保根据key找value的操作在O(1)左右。
不过这里要注意,如果Redis中有海量的key值的话,这个Rehash过程会很长很长,虽然采用渐进式Rehash,但在Rehash的过程中还是会导致请求有不小的卡顿。并且像一些统计命令也会非常卡顿:比如keys
按照Redis的配置每个实例能存储的最大的key的数量为2的32次方,即2.5亿,但是尽量把key的数量控制在千万以下,这样就可以避免Rehash导致的卡顿问题,如果数量确实比较多,建议采用分区hash存储。
发布和订阅
Redis提供了基于“发布/订阅(pub/sub)”模式的消息机制,此模式下,消息发布者和订阅者不进行直接的通信,发布者客户端向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以收到该消息.
操作命令
Redis主要提供了发布消息、订阅频道、取消订阅以及按照模式订阅和取消订阅等命令。
publish 发布消息
publish channel message
返回值是接收到信息的订阅者数量,如果是0说明没有订阅者,这条消息就丢了(再启动订阅者也不会收到)。
subscribe 订阅消息
subscribe channel [channel ...]
订阅者可以订阅一个或多个频道,如果此时另一个客户端发布一条消息,当前订阅者客户端会收到消息。
客户端在执行订阅命令之后进入了订阅状态(类似于监听),
只能接收subscribe、psubscribe,unsubscribe、 punsubscribe的四个命令。
psubscribe订阅频道
PSUBSCRIBE pattern [pattern ...]
通过模式订阅频道。模式是一个类似正则表达式的概念,允许客户端订阅匹配特定模式的所有频道。
示例:如果客户端执行 PSUBSCRIBE news.* 命令,它将订阅所有以 “news.” 开头的频道,并接收这些频道上发布的所有消息。
unsubscribe 取消订阅一个或多个频道
UNSUBSCRIBE channel [channel ...]
取消订阅一个或多个频道。
示例:如果客户端之前订阅了 “news.it” 频道,并执行 UNSUBSCRIBE news.it 命令,它将停止接收该频道上发布的消息。
punsubscribe 取消订阅一个或多个模式
UNSUBSCRIBE channel [channel ...]
取消订阅一个或多个模式。
示例:如果客户端之前订阅了 “news.it” 频道,并执行 UNSUBSCRIBE news.it 命令,它将停止接收该频道上发布的消息。
查询订阅情况-查看活跃的频道
pubsub channels [pattern]
Pubsub 命令用于查看订阅与发布系统状态,包括活跃的频道(是指当前频道至少有一个订阅者),其中[pattern]是可以指定具体的模式,类似于通配符。
查询订阅情况-查看频道订阅数
pubsub numsub channel
使用场景
需要消息解耦又并不关注消息可靠性的地方都可以使用发布订阅模式。
缺点
- 会出现消息丢失,无法找回情况
- 无法实现消息堆积和回溯
例如:PubSub 的生产者传递过来一个消息,Redis会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
所以和很多专业的消息队列系统(例如Kafka、RocketMQ)相比,Redis 的发布订阅很粗糙,但胜在足够简单,如果当前场景可以容忍的这些缺点,也不失为一个不错的选择。
正是因为 PubSub 有这些缺点,它的应用场景其实是非常狭窄的。
从Redis5.0 新增了 Stream 数据结构。
Redis Stream
Redis5.0最大的新特性就是多出了一个数据结构Stream,它是一个新的强大的支持多播的可持久化的消息队列,Redis的作者声明Redis Stream借鉴了Kafka的设计。
Stream总述
Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。
具体操作:
1.每个Stream都有唯一的名称,它就是Redis的key,首次使用xadd指令追加消息时自动创建。
- 消息ID的形式是timestampInMillis-sequence。
- 例如1537856881582-3,它表示当前的消息在毫米时间戳1537856881582时产生,并且是该毫秒内产生的第5条消息。
- 消息ID可以由服务器自动生成(*代表默认自动),也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的ID要大于前面的消息ID。
- 消息内容就是键值对,跟hash结构的键值对一样。
2.每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。
每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。
3.每个消费组(Consumer Group)的状态都是独立的,相互不受影响。同一份Stream内部的消息会被每个消费组都消费到。
4.同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动,每个消费者有一个组内唯一名称。
多个消费者可以达到流量分摊的目的,为大业务流量的场景做负载和分流。
5.消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取,但是还没有ack的消息。
如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称为PEL,也就是Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
常用操作命令
生产端
xadd 追加消息
xadd第一次对于一个stream使用可以生成一个stream的结构
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
- key:流的键名
- ID:新条目的ID。如果指定为*,Redis会自动生成一个唯一的ID。
- field value:一个或多个键值对,表示添加到流中的条目数据。
- [NOMKSTREAM]:如果流不存在