当前位置: 首页 > article >正文

Redis5新特性-stream

Stream队列

        Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的 支持多播的可持久化的消息队列,作者声明 Redis Stream 地借鉴了 Kafka 的设计。

生产者

xadd 追加消息

xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。

xrange 获取消息列表,会自动过滤已经删除的消息

xlen 消息长度

del 删除 Stream

1. 追加消息

xadd streamtest1 * name kk age 30

xadd:新增消息固定写法

streamtest1:队列名, 可任意

*:让redis自动生成id

2. 获取消息列表

xrange streamtest1 - +

xrange streamtest1 1701655814064-0 +

-表示最小值, +表示最大值, 也可以换成具体的消息id

3. 删除消息

只给消息打上标志位, 不会真删消息

xdel streamtest1 1701655821193-0

4. 删除整个stream

del streamtest1

消费者

单消费者 xread

1. 从头开始读

xread count 1 block 0 streams streamtest 0-0

count 1: 读取数量1

block 0: 阻塞

0-0:从头开始读

2. 从指定id开始读

xread count 1 block 0 streams streamtest 1701659324617-0

3. 从结尾开始读-阻塞到有消息

xread count 1 block 0 streams streamtest $

消费群组 xgroup

1. 创建消费者群组

xgroup create streamtest cg1 0-0

  • streamtest: 队列名
  • cg1: 群组名称, 可随意指定
  • 0-0: 从头开始读

xgroup create streamtest cg2 $

  • $: 从结尾开始读, 只接受新消息, 老消息全部忽略

2. 查看队列详情

xinfo stream streamtest

3. 查看群组详情

xinfo groups streamtest

4. 群组消费

xreadgroup GROUP cg1 c1 count 1 block 0 streams streamtest >

“GROUP”属于关键字,“cg1”是消费组名称,“c1”是消费者名称,“count 1”指明了消费数量,> 号表示从当前消费组的 last_delivered_id 后面开始读, 每当消费者读取一条消息,last_delivered_id 变量就会前进

5. 查看群组消费情况

xinfo consumers streamtest cg1

可以看到目前 c1 这个消费者有7条待 ACK 的消息,空闲了 329273 ms 没有读取消息。

6. ack确认消息

xack streamtest cg1 1701659319318-0 1701659324617-0

消息确认完, pending消息7条变成5条

Redis几种队列实现

1. 基于List的LPUSH+BRPOP的实现

优点: 实现简单, 消息延迟几乎为0

缺点: 空闲连接问题, ack问题.

如果线程一直阻塞在那里,Redis 客户端的连接就成了闲置连接,闲置过久, 服务器一般会主动断开连接,减少闲置资源占用,这个时候 blpop 和 brpop 或抛 出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常需要重试

2. 基于Sorted-Set 的实现

        多用来实现延迟队列,也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息。

3. PUB/SUB,订阅/发布模式

优点典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可 以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者 读取,消费者会自动接收到信道发布的消息。

缺点:

  1. 消息一旦发布, 若客户端不在线, 消息就会丢失;
  2. 不能保证每个消费者接收的时间是一致的;
  3. 若消费者客户端出现消息 积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产 远大于消费速度时.

由此可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务, 而是擅长处理广播,即时通讯,即时反馈的业务

4. 基于Stream类型的实现

        已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件


http://www.kler.cn/a/156995.html

相关文章:

  • Springboot整合Prometheus+grafana实现系统监控
  • Oracle OCP认证考试考点详解082系列16
  • 【MySQL】数据库知识突破:数据类型全解析与详解
  • 通过脚本,发起分支合并请求和打tag
  • 设计模式-七个基本原则之一-单一职责原则 + SpringBoot案例
  • 第16章 SELECT 底层执行原理
  • 鸿蒙开发笔记
  • fbprophet 安装流程
  • 探索人工智能领域——每日20个名词详解【day7】
  • Win10安装ROS2遇到的小问题
  • centos7 设置静态ip
  • Java项目学生管理系统四编辑学生
  • 蓝桥杯物联网竞赛_STM32L071KBU6_全部工程及源码
  • 数据结构 | 查漏补缺之求叶子结点,分离链接法、最小生成树、DFS、BFS
  • 大于2T磁盘划分并挂接
  • webGL开发学科演示项目方案
  • Leetcode 80 删除排序数组中的重复项 II
  • WebSocket入门介绍及编程实战
  • 【23-24 秋学期】NNDL 作业11 LSTM
  • 使用com组件编辑word
  • Nacos与Eureka的区别
  • 【COCI2011-2012#5】 EKO / 砍树
  • Mybatis总结
  • C++11改进单例模式
  • CMMI认证有什么意义
  • GPIO的使用--操作PF09 PF10 PF08实现呼吸灯、跑马灯、警报闪烁灯