当前位置: 首页 > article >正文

【rabbitmq】实现问答消息消费示例

目录

          • 1. 说明
          • 2. 截图
            • 2.1 接口调用截图
            • 2.2 项目结构截图
          • 3. 代码示例

1. 说明
  • 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
  • 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
  • 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
  • 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
  • 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图

在这里插入图片描述

2.2 项目结构截图

在这里插入图片描述

3. 代码示例
  • 1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
    </parent>

    <groupId>com.learning</groupId>
    <artifactId>springboot</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.4</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <!--打jar包使用-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
  • 2.application.yaml
spring:
   # rabbbitmq配置信息
  rabbitmq:
    host: 192.168.2.11
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  • 3.rabbitmq配置类
package com.learning.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * rabbitmq配置类
 */
@Configuration  
public class RabbitMQConfig{

    /**
     * 存sseEmitter
     */
    @Bean("emitterMap")
    public ConcurrentMap<String, SseEmitter> emitterMap(){
        ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
        return emitters;
    }

    /**
     * 工作模式,交换机名
     */
    public static final String EXCHANGE_NAME = "work_exchange";

    /**
     * 工作模式,队列名
     */
    public static final String QUEUE_NAME = "work_queue";  
  
    @Bean("work_queue")
    public Queue queue() {  
        return new Queue(QUEUE_NAME, true);  
    }  
  
    @Bean("work_exchange")
    public Exchange exchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }  
  
    @Bean  
    public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();
    }  
}
  • 4.controller类
package com.learning.controller;

import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

/**
 * @Author wangyouhui
 * @Description 获取答案
 **/
@RestController
@RequestMapping("/question")
public class QuestionController {

    @Autowired
    private QuestionService questionService;

    @Autowired
    private ConcurrentMap<String, SseEmitter> emitterMap;

    @PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter ask(@RequestParam String question) {
        String questionId = UUID.randomUUID().toString();
        SseEmitter emitter = new SseEmitter();
        emitterMap.put(questionId, emitter);
        questionService.ask(questionId, question);
        return emitter;
    }
}

  • 5.消息实体
package com.learning.dto;

import lombok.Data;

import java.io.Serializable;

/**
 * @Author wangyouhui
 * @Description 消息
 **/
@Data
public class MessageDTO implements Serializable {
    private String questionId;
    private String question;
    private String answer;
    private Boolean end;
}

  • 6.service实现类
package com.learning.service.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ConcurrentMap;

/**
 * @Author wangyouhui
 * @Description
 **/
@Service
public class QuestionServiceImpl implements QuestionService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConcurrentMap<String, SseEmitter> emitterMap;

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")
    public void receiveMessage(Message message, Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String json = new String(message.getBody());
            ObjectMapper mapper = new ObjectMapper();
            MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);
            SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());
            if(sseEmitter != null){
                sseEmitter.send(messageDTO);
            }
            if(messageDTO.getEnd() != null && messageDTO.getEnd()){
                sseEmitter.complete();
                emitterMap.remove(messageDTO.getQuestionId());
            }
            // 手动签收
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            e.printStackTrace();
            // 拒绝签收,消息重新入队
            try {
                channel.basicReject(deliveryTag, true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

    @Override
    public void ask(String questionId, String question) {
        MessageDTO message1 = new MessageDTO();
        message1.setQuestionId(questionId);
        message1.setQuestion(question);
        message1.setAnswer("您好,这个");
        message1.setEnd(false);
        this.sendMessage(message1);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        MessageDTO message2 = new MessageDTO();
        message2.setQuestionId(questionId);
        message2.setQuestion(question);
        message2.setAnswer("您好,这个是答案");
        message2.setEnd(false);
        this.sendMessage(message2);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        MessageDTO message3 = new MessageDTO();
        message3.setQuestionId(questionId);
        message3.setQuestion(question);
        message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");
        message3.setEnd(true);
        this.sendMessage(message3);
    }

    public void sendMessage(MessageDTO message){
        ObjectMapper mapper = new ObjectMapper();
        String json = null;
        try {
            json = mapper.writeValueAsString(message);
            rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  • 7.service接口
package com.learning.service;

/**
 * @Author wangyouhui
 * @Description 
 **/
public interface QuestionService {
    void ask(String questionId, String question);
}

  • 8.应用类
package com.learning;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author wangyouhui
 * @Description
 **/
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

http://www.kler.cn/news/366935.html

相关文章:

  • 【项目复现】——DDoS-SDN Detection Project
  • C语言程序设计:现代设计方法习题笔记《chapter5》下篇
  • 【HTML】之form表单元素详解
  • 强大!Spring Boot 3.3 集成 PDFBox 轻松实现电子签章功能!
  • PVE系统无损挂载ntfs格式物理磁盘
  • WASM 使用说明23事(RUST实现)
  • qml圆形图片,qml圆形头像制作
  • STM32 HAL 点灯
  • 【K8S系列】Kubernetes Pod节点CrashLoopBackOff 状态及解决方案详解【已解决】
  • github上传文件代码以及其它github代码
  • AppleVisionPro空间定位 三维空间重现-Unity3D
  • iOS调试真机出现的 “__llvm_profile_initialize“ 错误
  • 数据结构-队列
  • Vast.ai LLM 大语言模型使用手册(2)
  • 74. 搜索二维矩阵
  • 了解 - 微格式
  • 萤石设备视频接入平台EasyCVR私有化视频平台变电站如何实现远程集中监控?
  • Java后端面试题:Java基础篇
  • Spring微服务概述之spring cloud alibaba服务调用实践
  • 在平面模型上提取凹多边形的点云处理
  • Unity引擎:游戏开发的核心力量
  • python 深度学习 项目调试 图像分割 segment-anything
  • 微信小程序 - 动画(Animation)执行过程 / 实现过程 / 实现方式
  • RabbitMQ 发布确认高级部分
  • 语音交互:重塑人机对话的未来
  • 【Nas】X-Doc:jellyfin“该客户端与媒体不兼容,服务器未发送兼容的媒体格式”问题解决方案