当前位置: 首页 > article >正文

即时通讯增加kafka渠道

此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手

增加kafka渠道

  • pom.xml

引入stream相关依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

由于涉及到SpringCloud,可以交由spring-cloud-dependencies统一管理

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.7.18</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2021.0.8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
  • application.yml

在yml中对stream相关项进行配置

server:
  port: 18080
cus:
  ws:
    exclude-receiver-info-flag: true
    receiver-excludes-himself-flag: true
    #更改渠道为stream
    communication-type: stream
  cloud:
    function:
      #允许stream访问的bean
      definition: listener
    stream:
      kafka:
        binder:
          #kafka链接信息
          brokers: ${KAFKA_BROKERS:127.0.0.1:9092}
          #允许自动创建topic
          auto-create-topics: true
      bindings:
        #消费者bean-in-index
        listener-in-0:
          #主题
          destination: TEST_TOPIC


  • RedisSendExecutor

kafka生产者

package com.example.im.infra.executor.send.stream;

import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;

/**
 * @author PC
 * 消息队列执行
 */
@Component
public class StreamSendExecutor extends AbstractBaseSendExecutor {
    private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);

    private final StreamBridge streamBridge;

    @Autowired
    public StreamSendExecutor(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Override
    public String getCommunicationType() {
        return ImConstants.CommunicationType.STREAM;
    }

    @Override
    public void sendToUser(String sendUserName, String message) {
        MessageInfo messageInfo = new MessageInfo();
        messageInfo.setSendUserName(sendUserName);
        messageInfo.setMessage(message);
        messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);
        logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");
        streamBridge.send("TEST_TOPIC", messageInfo);
    }

    @Override
    public void sendToAll(String sendUserName, String message) {
        MessageInfo messageInfo = new MessageInfo();
        messageInfo.setSendUserName(sendUserName);
        messageInfo.setMessage(message);
        messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);
        logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");
        streamBridge.send("TEST_TOPIC", messageInfo);
    }
}
  • StreamMessageListener

kafka消费者

package com.example.im.infra.executor.send.stream;

import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;
import java.util.function.Function;

/**
 * @author PC
 * 消息队列监听
 */
@Component
public class StreamMessageListener {
    private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);

    private DefaultSendExecutor defaultSendExecutor;

    @Autowired
    public void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {
        this.defaultSendExecutor = defaultSendExecutor;
    }

    @Bean
    public Function<Flux<Message<byte[]>>, Mono<Void>> listener() {
        return messageInfoFlux -> messageInfoFlux.map(message -> {
            String messageJson = new String(message.getPayload(), StandardCharsets.UTF_8);
            MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {
            });
            switch (messageInfo.getScopeOfSending()) {
                case USER:
                    defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());
                    break;
                case ALL:
                    defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());
                    break;
                default:
                    //一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子
                    logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());
                    break;
            }
            return messageInfo;
        }).then();
    }
}

测试

test2向test1发送消息,成功接收

直接在消息队列中发送消息,test1也接收到了消息

参考资料

[1].SpringCloudStream中文文档

[2].im项目


http://www.kler.cn/a/354448.html

相关文章:

  • Android基于回调的事件处理
  • 使用vue-pdf预览pdf和解决pdf电子签章显示问题
  • 【Maui】动态菜单实现(绑定数据视图)
  • AR 眼镜之-拍照/录像动效切换-实现方案
  • 网络-ping包分析
  • Unity3D使用GaussianSplatting加载高斯泼溅模型
  • 基于workbox实现PWA预缓存能力
  • 11.9K Star!强大的 Web 爬虫工具 FireCrawl:为 AI 训练与数据提取提供全面支持
  • 【Linux】解读信号的本质&相关函数及指令的介绍
  • UI自动化测试 —— web端元素获取元素等待实践!
  • 国产游戏技术:迈向全球引领者的征途
  • 2.计算机网络_IP地址
  • React 探秘(一):fiber 架构
  • 自动驾驶系列—探索自动驾驶持续部署(CD)技术与最佳实践
  • UE5 猎户座漂浮小岛 01 资源 玩家角色
  • 从2.x到3.x:Spring Boot升级实战踩坑记录!
  • Go语言中的时间比较与时区处理
  • 利用Microsoft Entra Application Proxy在无公网IP条件下安全访问内网计算机
  • 代码训练营 day34|LeetCode 134,LeetCode 135,LeetCode 860,LeetCode 406
  • 根据Vue对比来深入学习React 下 props 组件传值 插槽 样式操作 hooks 高阶组件 性能优化
  • 如何使用Python合并Excel文件中的多个Sheet
  • 【ARM 嵌入式 编译系列 10.9 -- Clang 编译器】
  • Redis中String类型常见的应用场景
  • Android中的IntentService及其作用。
  • 编辑器、节点树、基础设置
  • LDR6500协议芯片:诱骗取电协议,OTG数据同时实现功能芯片