当前位置: 首页 > article >正文

Redisson发布订阅学习

介绍

Redisson 的消息订阅功能遵循 Redis 的发布/订阅模式,该模式包括以下几个核心概念:

  • 发布者(Publisher):发送消息到特定频道的客户端。在 Redis 中,这通过 PUBLISH 命令实现。

  • 订阅者(Subscriber):监听频道并接收消息的客户端。Redis 提供了 SUBSCRIBE 命令来实现订阅功能。

  • 频道(Channel):消息传递的媒介。发布者和订阅者通过频道进行通信。

  • 消息(Message):通过频道从发布者传递到订阅者的数据。

Redisson 对 Redis 的发布/订阅机制进行了封装,提供了更易于使用的 Java API。具体实现如下:

  • 发布消息(Publish):当一个客户端想要发送消息时,它使用 Redis 的 PUBLISH 命令将消息发送到特定的频道。Redisson 提供了 RTopic 接口来实现这一功能。例如:

RTopic topic = redisson.getTopic("testTopic");
topic.publish("Hello, World!");
  • 订阅消息(Subscribe):另一端的客户端使用 Redis 的 SUBSCRIBE 命令订阅一个或多个频道。当有消息发送到这些频道时,Redis 会自动将消息推送给所有订阅者。Redisson 提供了 addListener 方法来添加消息监听器。例如:
topic.addListener(String.class, new MessageListener<String>() {
    @Override
    public void onMessage(CharSequence channel, String msg) {
        System.out.println("Received message from " + channel + ": " + msg);
    }
});
 消息处理

Redisson 为 RTopic 实现了监听器接口,允许应用程序定义如何处理接收到的消息。当消息到达时,Redisson 会调用这些监听器。例如,可以在 onMessage 方法中处理消息,或者在处理过程中捕获异常并进行适当的兜底策略。

优点
  • 解耦:发布者和订阅者之间不需要知道对方,可以独立进行扩展和修改。

  • 简单易用:Redisson 提供了简单的 API 来进行消息的发布和订阅,开发者可以轻松集成到自己的应用中。

  • 实时性:Redis 的发布/订阅机制提供了低延迟的消息传递,适合需要实时通信的应用。

  • 可扩展性:可以有多个订阅者同时订阅同一个主题,而且可以通过增加 Redis 实例来水平扩展系统。

  • 高吞吐量:Redis 作为内存数据结构存储,能够处理大量的消息。

 缺点
  • 消息丢失

    Redis 的 Pub/Sub 是即时的,如果订阅者不在线,消息会丢失。如果需要消息持久化,可以考虑使用 Redis Stream。
  • 网络断开

    如果客户端与 Redis 服务器的连接断开,需要重新订阅频道。
  • 性能问题

    如果订阅的频道过多或消息量过大,可能会影响 Redis 服务器的性能。

代码示例

添加依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.6</version> 
</dependency>
初始化 Redisson 客户端
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class RedissonConfig {

    public static RedissonClient createRedissonClient() {

        Config config = new Config();
        config.useSingleServer()
                 //Redis 服务器地址
                .setAddress("redis://127.0.0.1:6379")
                //密码
                .setPassword("password") 
                .setKeepAlive(true);
        return Redisson.create(config);
    }

}
创建消息发布者
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@AllArgsConstructor
public class MessagePublisher {

    private RedissonClient redissonClient;

    public void publishMessage(String topicName, String message) {

        RTopic topic = redissonClient.getTopic(topicName);

        topic.publish(message);
    }
}
创建消息订阅者
import lombok.AllArgsConstructor;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

@Component
@AllArgsConstructor
public class DemoMessagePublisher{

	private RedissonClient redissonClient;

	public void publishMessage(String topicName, String message) {

		RTopic topic = redissonClient.getTopic(topicName);

		topic.publish(message);
	}

}
消息绑定
import com.jlcloud.tenant.mq.listener.TenantMessageListener;
import lombok.AllArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class MessageTestRunner implements CommandLineRunner {

	private DemoMessageListener messagePublisher;

	@Override
	public void run(String... args) throws Exception {
		// 订阅主题
		messagePublisher.listener("test_update");
	}
}

创建发消息
import com.chengxuyuanshitang.mq.publisher.DemoMessagePublisher;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@AllArgsConstructor
@RequestMapping("/front/test")
@Tag(name = "TestController", description = "TestController")
public class TestController {

	private final DemoMessagePublisher mssagePublisher;

	@GetMapping("/msg/send")
	@Operation(summary = "消息发送")
	public Boolean sendMsg() {
		mssagePublisher.publishMessage("test_update", "== test a msg==");
		return Boolean.TRUE;
	}
}





http://www.kler.cn/a/506629.html

相关文章:

  • 【Web】Web API 简介
  • 镭速大文件传输视频文件预览实现原理
  • go chan底层分析
  • React封装倒计时按钮
  • 工作中redis常用的5种场景
  • CCLINKIE转ModbusTCP网关,助机器人“掀起”工业智能的“惊涛骇浪”
  • (蓝桥杯)二维数组前缀和典型例题——子矩阵求和
  • 深入理解 Entity、VO、QO、DTO 的区别及其在 MVC 架构中的应用
  • C# 根据name查找并返回winform菜单栏(MenuStrip)、工具栏(ToolStrip)中的子控件来修改属性
  • 3D高斯在自动驾驶中的应用
  • Oracle系列---【ORA-01017用户名密码无效】
  • 合合信息名片全能王上架原生鸿蒙应用市场,成为首批数字名片类应用
  • 深度学习电影推荐-CNN算法
  • 【深度学习地学应用|滑坡制图、变化检测、多目标域适应、感知学习、深度学习】跨域大尺度遥感影像滑坡制图方法:基于原型引导的领域感知渐进表示学习(四)
  • H3CNE-11-生成树协议STP
  • elasticsearch线程池配置
  • Profibus DP转Modbus TCP协议转换网关模块功能详解
  • 图形验证码是怎样保护登录安全的?
  • 【JVM-4】深入解析JVM垃圾回收算法:原理、实现与优化
  • Golang学习笔记_26——通道
  • 【C++】size_t全面解析与深入拓展
  • ‌如何有效学习PyTorch:从基础到实践的全面指南‌
  • python入门
  • root后如何隐藏环境?
  • LabVIEW驱动电机实现样品自动搜索
  • 从零开始打造AI知识库:使用爬虫自动化采集网页内容的完整教程