当前位置: 首页 > article >正文

【redis】发布订阅

Redis的发布订阅(Pub/Sub)是一种基于消息多播的通信机制,它允许消息的**发布者(Publisher)向特定频道发送消息,而订阅者(Subscriber)**通过订阅频道或模式来接收消息。

其核心特点如下:

  1. 轻量级:无需额外组件,直接通过Redis服务实现

  2. 实时性:消息即时推送,无轮询延迟

  3. 广播模式:一个消息可被多个订阅者同时接收

  4. 无状态性:不存储历史消息,订阅者只能接收订阅后的消息

发布订阅命令的使用

有关发布订阅的命令可以通过help @pubsub命令来查看。有关命令的使用可以通过help 命令来查看,例如help publish

基础命令速查表

命令作用示例
SUBSCRIBE订阅一个或多个频道SUBSCRIBE news sports
PSUBSCRIBE使用模式匹配订阅频道PSUBSCRIBE sensor.*
PUBLISH向指定频道发送消息PUBLISH news "Hello"
UNSUBSCRIBE退订指定频道UNSUBSCRIBE news
PUNSUBSCRIBE退订模式订阅PUNSUBSCRIBE sensor.*
PUBSUB CHANNELS查看活跃频道列表PUBSUB CHANNELS "sensor.*"

操作示例

# 订阅者A(终端1)
127.0.0.1:6379> subscribe notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notifications"
3) (integer) 1

# 订阅者B(终端2) 
127.0.0.1:6379> psubscribe system.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "system.*"
3) (integer) 1

# 发布消息(终端3)
127.0.0.1:6379> publish notifications "Service will be upgraded soon"
(integer) 1

127.0.0.1:6379> publish system.alert "CPU usage exceeds 90%"
(integer) 1

# 订阅者A收到:
1) "message"
2) "notifications"
3) "Service will be upgraded soon"

# 订阅者B收到: 
1) "pmessage"
2) "system.*"
3) "system.alert"
4) "CPU usage exceeds 90%"

发布订阅的使用场景与优缺点

适用场景

  1. 实时通知系统:用户在线状态更新,即时聊天消息推送

  2. 事件驱动架构:缓存失效广播,分布式配置更新

  3. 轻量级监控:服务器状态报警,业务指标异常通知

优点

  • 极低延迟(平均<1ms)

  • 支持百万级TPS消息吞吐

  • 模式匹配订阅实现灵活路由

  • 零外部依赖(仅需Redis服务)

缺点

消息不可靠性:不保证送达,离线订阅者会丢失消息

无持久化机制:重启后所有订阅关系丢失

客户端阻塞:订阅操作会占用连接线程(需异步处理)

替代方案建议:需要可靠消息时,使用Redis Streams(支持消息持久化、消费者组)或RabbitMQ/Kafka

在Java中使用RedisTemplate实现

配置RedisTemplate

package com.morris.redis.demo.pubsub;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 对redis的键值进行序列化
 */
@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(
            RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        // 使用 String 序列化 key
        template.setKeySerializer(new StringRedisSerializer());
        // 使用 JSON 序列化 value(需要额外依赖 jackson)
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        // 对于 Hash 结构同理
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

        return template;
    }
}

实现消息发布者

package com.morris.redis.demo.pubsub;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * 消息发布者
 */
@Service
public class MessagePublisher {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void sendNotification(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

实现消息订阅者

package com.morris.redis.demo.pubsub;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 消息订阅者
 */
@Component
public class MessageSubscriber implements MessageListener {

    @Resource
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
        System.out.printf("收到频道[%s]的消息: %s\n", channel, body);
    }
}

配置订阅监听

package com.morris.redis.demo.pubsub;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * 配置redis消息订阅监听器
 */
@Configuration
@Slf4j
public class RedisPubSubConfig {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageSubscriber messageSubscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        
        // 订阅具体频道
        container.addMessageListener(messageSubscriber, new ChannelTopic("notifications"));
        
        // 订阅模式匹配
        container.addMessageListener(messageSubscriber, new PatternTopic("system.*"));
        
        // 异常处理
        container.setErrorHandler((e) -> {
            log.error("[listen message] error ", e);
        });
        
        return container;
    }
}

使用示例

package com.morris.redis.demo.pubsub;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 使用接口发布消息
 */
@RestController
@RequestMapping("/pubsub")
public class PubSubDemoController {

    @Resource
    private MessagePublisher publisher;

    // 发布告警
    @GetMapping("/alert")
    public String sendAlert(@RequestParam String message) {
        publisher.sendNotification("system.alert", message);
        return "警报已发送";
    }

    // 发布通知
    @GetMapping("/notify")
    public String sendNotify(@RequestParam String message) {
        publisher.sendNotification("notifications", message);
        return "通知已发送";
    }
}

http://www.kler.cn/a/586028.html

相关文章:

  • windows11 的 .gitignore 文件失效(从来没有进行 commit 以及 add 操作,只是 git init 了)
  • 科技快讯 | “垃圾短信”可以被识别了;阿里正式推出AI旗舰应用;OpenAI深夜发布全新Agent工具
  • GC 频率和触发条件
  • 31、map deque list的实现原理【中高频】
  • AdaLoRA 参数 配置:CAUSAL_LM“ 表示因果语言模型任务
  • 【数据库】10分钟学会MySQL的增删改查:数据库、表、表记录操作指南
  • 分布式IO模块:架起城轨交通物理层与控制层的信息桥梁
  • WHQL微软驱动签名认证,让企业驱动在Windows系统畅通无阻
  • TPCTF2025 -Web Writeup
  • 10.3 kubelet 中的cgroupManager解析和节点qos顶级目录创建
  • S_on@atwk的意思
  • 差分专题练习 ——基于罗勇军老师的《蓝桥杯算法入门C/C++》
  • STM32---FreeRTOS消息队列
  • 16 | 实现简洁架构的 Store 层
  • Element Ui - 编辑时表单校验信息未清空问题处理
  • 作物移栽机器人的结构设计的介绍
  • Web后端开发之Maven
  • 2025年AI搜索引擎开源项目全景指南:从核心框架到生态工具
  • ARM64 架构地址空间分配深度解析
  • 79. 单词搜索:题解