1. 说明
- 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
- 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
- 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
- 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
- 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图
2.2 项目结构截图
3. 代码示例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
</parent>
<groupId>com.learning</groupId>
<artifactId>springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<!--打jar包使用-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
spring:
# rabbbitmq配置信息
rabbitmq:
host: 192.168.2.11
port: 5672
username: admin
password: admin
virtual-host: /
package com.learning.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* rabbitmq配置类
*/
@Configuration
public class RabbitMQConfig{
/**
* 存sseEmitter
*/
@Bean("emitterMap")
public ConcurrentMap<String, SseEmitter> emitterMap(){
ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
return emitters;
}
/**
* 工作模式,交换机名
*/
public static final String EXCHANGE_NAME = "work_exchange";
/**
* 工作模式,队列名
*/
public static final String QUEUE_NAME = "work_queue";
@Bean("work_queue")
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean("work_exchange")
public Exchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();
}
}
package com.learning.controller;
import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
/**
* @Author wangyouhui
* @Description 获取答案
**/
@RestController
@RequestMapping("/question")
public class QuestionController {
@Autowired
private QuestionService questionService;
@Autowired
private ConcurrentMap<String, SseEmitter> emitterMap;
@PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter ask(@RequestParam String question) {
String questionId = UUID.randomUUID().toString();
SseEmitter emitter = new SseEmitter();
emitterMap.put(questionId, emitter);
questionService.ask(questionId, question);
return emitter;
}
}
package com.learning.dto;
import lombok.Data;
import java.io.Serializable;
/**
* @Author wangyouhui
* @Description 消息
**/
@Data
public class MessageDTO implements Serializable {
private String questionId;
private String question;
private String answer;
private Boolean end;
}
package com.learning.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
/**
* @Author wangyouhui
* @Description
**/
@Service
public class QuestionServiceImpl implements QuestionService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConcurrentMap<String, SseEmitter> emitterMap;
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String json = new String(message.getBody());
ObjectMapper mapper = new ObjectMapper();
MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);
SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());
if(sseEmitter != null){
sseEmitter.send(messageDTO);
}
if(messageDTO.getEnd() != null && messageDTO.getEnd()){
sseEmitter.complete();
emitterMap.remove(messageDTO.getQuestionId());
}
// 手动签收
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
// 拒绝签收,消息重新入队
try {
channel.basicReject(deliveryTag, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
@Override
public void ask(String questionId, String question) {
MessageDTO message1 = new MessageDTO();
message1.setQuestionId(questionId);
message1.setQuestion(question);
message1.setAnswer("您好,这个");
message1.setEnd(false);
this.sendMessage(message1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
MessageDTO message2 = new MessageDTO();
message2.setQuestionId(questionId);
message2.setQuestion(question);
message2.setAnswer("您好,这个是答案");
message2.setEnd(false);
this.sendMessage(message2);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
MessageDTO message3 = new MessageDTO();
message3.setQuestionId(questionId);
message3.setQuestion(question);
message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");
message3.setEnd(true);
this.sendMessage(message3);
}
public void sendMessage(MessageDTO message){
ObjectMapper mapper = new ObjectMapper();
String json = null;
try {
json = mapper.writeValueAsString(message);
rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.learning.service;
/**
* @Author wangyouhui
* @Description
**/
public interface QuestionService {
void ask(String questionId, String question);
}
package com.learning;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author wangyouhui
* @Description
**/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}