springboot websocket语音识别翻译
本项目总结内容,前端通过websocket调用后端传输pcm的音频,后端通过websocket调用另一台服务器的websocket进行识别音频。最后拿到翻译后的内容在通过websocket返回给前端
基于若依分离版springboot3搭建 地址:https://www.ruoyi.vip/
websocket部分的代码 地址https://gitee.com/dromara/RuoYi-Vue-Plus
感谢若依大佬和狮子大佬
- 目录结构
- 基本配置
WebSocketProperties.java
package com.beijing.websocket.config.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* WebSocket 配置项
*
* @author Robert-L
*/
@ConfigurationProperties("websocket")
@Data
public class WebSocketProperties {
private Boolean enabled;
/**
* 路径
*/
private String path;
/**
* 设置访问源地址
*/
private String allowedOrigins;
}
WebSocketConfig.java
package com.beijing.websocket.config;
import cn.hutool.core.util.StrUtil;
import com.beijing.websocket.config.properties.WebSocketProperties;
import com.beijing.websocket.handler.PlusWebSocketHandler;
import com.beijing.websocket.interceptor.PlusWebSocketInterceptor;
import com.beijing.websocket.listener.WebSocketTopicListener;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* WebSocket 配置
*
* @author Robert-L
*/
@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfig {
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
// 如果WebSocket的路径为空,则设置默认路径为 "/websocket"
if (StrUtil.isBlank(webSocketProperties.getPath())) {
webSocketProperties.setPath("/websocket");
}
// 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求
if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) {
webSocketProperties.setAllowedOrigins("*");
}
// 返回一个WebSocketConfigurer对象,用于配置WebSocket
return registry -> registry
// 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(webSocketProperties.getAllowedOrigins());
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new PlusWebSocketInterceptor();
}
@Bean
public WebSocketHandler webSocketHandler() {
return new PlusWebSocketHandler();
}
@Bean
public WebSocketTopicListener topicListener() {
return new WebSocketTopicListener();
}
}
WebSocketConstants.java
package com.beijing.websocket.constant;
/**
* websocket的常量配置
*
* @author Robert-L
*/
public interface WebSocketConstants {
/**
* websocketSession中的参数的key
*/
String LOGIN_USER_KEY = "loginUser";
/**
* 订阅的频道
*/
String WEB_SOCKET_TOPIC = "global:websocket";
/**
* 前端心跳检查的命令
*/
String PING = "ping";
/**
* 服务端心跳恢复的字符串
*/
String PONG = "pong";
}
WebSocketMessageDto.java
package com.beijing.websocket.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
/**
* 消息的dto
*
* @author Robert-L
*/
@Data
public class WebSocketMessageDto implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 需要推送到的session key 列表
*/
private List<Long> sessionKeys;
/**
* 需要发送的消息
*/
private String message;
}
重点:PlusWebSocketHandler.java [修改了这个方法 handleBinaryMessage]
后端在websocket 接收音频消息的地方,进行创建websocket客户端
package com.beijing.websocket.handler;
import cn.hutool.core.util.ObjectUtil;
import com.beijing.common.core.domain.model.LoginUser;
import com.beijing.websocket.dto.WebSocketMessageDto;
import com.beijing.websocket.holder.WebSocketSessionHolder;
import com.beijing.websocket.utils.WebSocketUtils;
import com.beijing.websocket.vo.ReqSocketMessageVo;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.*;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static com.beijing.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
/**
* WebSocketHandler 实现类
*
* @author Robert-L
*/
@Slf4j
public class PlusWebSocketHandler extends AbstractWebSocketHandler {
private final String socketWbUrl = "ws://100.200.300.1:8888/ws/v1";
/**
* 连接成功后
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws IOException {
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
if (ObjectUtil.isNull(loginUser)) {
session.close(CloseStatus.BAD_DATA);
log.info("[connect] invalid token received. sessionId: {}", session.getId());
return;
}
WebSocketSessionHolder.addSession(loginUser.getUserId(), session);
// log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
}
/**
* 处理接收到的文本消息
*
* @param session WebSocket会话
* @param message 接收到的文本消息
* @throws Exception 处理消息过程中可能抛出的异常
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 从WebSocket会话中获取登录用户信息
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
// 创建WebSocket消息DTO对象
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setSessionKeys(List.of(loginUser.getUserId()));
webSocketMessageDto.setMessage(message.getPayload());
WebSocketUtils.publishMessage(webSocketMessageDto);
}
/**
* 处理接收到的二进制消息
*
* @param session WebSocket会话
* @param message 接收到的二进制消息
* @throws Exception 处理消息过程中可能抛出的异常
*/
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
// 前端传过来的音频数据
ByteBuffer pcmData = message.getPayload();
// 连接到目标 WebSocket 服务
StandardWebSocketClient client = new StandardWebSocketClient();
CompletableFuture<WebSocketSession> sessionFuture = client.execute(new AbstractWebSocketHandler() {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 发送启动语音识别指令
ReqSocketMessageVo req = new ReqSocketMessageVo();
req.setHeader(new ReqSocketMessageVo.SocketHeaderVo("SpeechRecognizer", "StartRecognition"));
ReqSocketMessageVo.SocketPayload socketPayload = new ReqSocketMessageVo.SocketPayload();
socketPayload.setLang_type("zh-cmn-Hans-CN");
req.setPayload(socketPayload);
ObjectMapper objectMapper = new ObjectMapper();
String jsonCommand = objectMapper.writeValueAsString(req);
session.sendMessage(new TextMessage(jsonCommand));
// 发送 PCM 数据
session.sendMessage(new BinaryMessage(pcmData));
log.info("PCM 数据已发送到目标服务");
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 接收识别结果
String recognitionResult = message.getPayload();
log.info("接收到识别结果: {}", recognitionResult);
// 解析识别结果
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> resultMap = objectMapper.readValue(recognitionResult, Map.class);
// 处理识别结果
String recognizedText = (String) resultMap.get("text");
log.info("识别后的文本: {}", recognizedText);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("WebSocket 传输错误: {}", exception.getMessage(), exception);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.info("WebSocket 连接已关闭: {}", closeStatus);
}
}, socketWbUrl);
// 等待连接完成
WebSocketSession webSocketSession = sessionFuture.get();
// 确保连接在使用完毕后关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (webSocketSession != null && webSocketSession.isOpen()) {
webSocketSession.close();
}
} catch (IOException e) {
log.error("关闭 WebSocket 连接时发生错误: {}", e.getMessage(), e);
}
}));
}
/**
* 处理接收到的Pong消息(心跳监测)
*
* @param session WebSocket会话
* @param message 接收到的Pong消息
* @throws Exception 处理消息过程中可能抛出的异常
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
WebSocketUtils.sendPongMessage(session);
}
/**
* 处理WebSocket传输错误
*
* @param session WebSocket会话
* @param exception 发生的异常
* @throws Exception 处理过程中可能抛出的异常
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
}
/**
* 在WebSocket连接关闭后执行清理操作
*
* @param session WebSocket会话
* @param status 关闭状态信息
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
if (ObjectUtil.isNull(loginUser)) {
log.info("[disconnect] invalid token received. sessionId: {}", session.getId());
return;
}
WebSocketSessionHolder.removeSession(loginUser.getUserId());
log.info("[disconnect] sessionId: {},userId:{}", session.getId(), loginUser.getUserId());
}
/**
* 指示处理程序是否支持接收部分消息
*
* @return 如果支持接收部分消息,则返回true;否则返回false
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
WebSocketSessionHolder.java
package com.beijing.websocket.holder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocketSession 用于保存当前所有在线的会话信息
*
* @author Robert-L
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionHolder {
private static final Map<Long, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
/**
* 将WebSocket会话添加到用户会话Map中
*
* @param sessionKey 会话键,用于检索会话
* @param session 要添加的WebSocket会话
*/
public static void addSession(Long sessionKey, WebSocketSession session) {
removeSession(sessionKey);
USER_SESSION_MAP.put(sessionKey, session);
}
/**
* 从用户会话Map中移除指定会话键对应的WebSocket会话
*
* @param sessionKey 要移除的会话键
*/
public static void removeSession(Long sessionKey) {
WebSocketSession session = USER_SESSION_MAP.remove(sessionKey);
try {
session.close(CloseStatus.BAD_DATA);
} catch (Exception ignored) {
}
}
/**
* 根据会话键从用户会话Map中获取WebSocket会话
*
* @param sessionKey 要获取的会话键
* @return 与给定会话键对应的WebSocket会话,如果不存在则返回null
*/
public static WebSocketSession getSessions(Long sessionKey) {
return USER_SESSION_MAP.get(sessionKey);
}
/**
* 获取存储在用户会话Map中所有WebSocket会话的会话键集合
*
* @return 所有WebSocket会话的会话键集合
*/
public static Set<Long> getSessionsAll() {
return USER_SESSION_MAP.keySet();
}
/**
* 检查给定的会话键是否存在于用户会话Map中
*
* @param sessionKey 要检查的会话键
* @return 如果存在对应的会话键,则返回true;否则返回false
*/
public static Boolean existSession(Long sessionKey) {
return USER_SESSION_MAP.containsKey(sessionKey);
}
}
PlusWebSocketInterceptor.java
package com.beijing.websocket.interceptor;
import com.beijing.common.core.domain.model.LoginUser;
import com.beijing.common.utils.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
import static com.beijing.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
/**
* WebSocket握手请求的拦截器
*
* @author Robert-L
*/
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param attributes attributes
* @return 是否握手成功
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = SecurityUtils.getLoginUser();
attributes.put(LOGIN_USER_KEY, loginUser);
return true;
}
/**
* 握手后
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param exception 异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
WebSocketTopicListener.java
package com.beijing.websocket.listener;
import cn.hutool.core.collection.CollUtil;
import com.beijing.websocket.holder.WebSocketSessionHolder;
import com.beijing.websocket.utils.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
/**
* WebSocket 主题订阅监听器
*
* @author Robert-L
*/
@Slf4j
public class WebSocketTopicListener implements ApplicationRunner, Ordered {
/**
* 在Spring Boot应用程序启动时初始化WebSocket主题订阅监听器
*
* @param args 应用程序参数
* @throws Exception 初始化过程中可能抛出的异常
*/
@Override
public void run(ApplicationArguments args) throws Exception {
// 订阅WebSocket消息
WebSocketUtils.subscribeMessage((message) -> {
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getSessionKeys())) {
message.getSessionKeys().forEach(key -> {
if (WebSocketSessionHolder.existSession(key)) {
WebSocketUtils.sendMessage(key, message.getMessage());
}
});
} else {
WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message.getMessage());
});
}
});
log.info("初始化WebSocket主题订阅监听器成功");
}
@Override
public int getOrder() {
return -1;
}
}
WebSocketUtils.class
package com.beijing.websocket.utils;
import cn.hutool.core.collection.CollUtil;
import com.beijing.websocket.dto.WebSocketMessageDto;
import com.beijing.websocket.holder.WebSocketSessionHolder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import static com.beijing.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
/**
* 工具类
*
* @author Robert-L
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {
// 使用本地内存来存储订阅关系
private static final ConcurrentHashMap<String, CopyOnWriteArrayList<Consumer<WebSocketMessageDto>>> subscribers = new ConcurrentHashMap<>();
/**
* 向指定的WebSocket会话发送消息
*
* @param sessionKey 要发送消息的用户id
* @param message 要发送的消息内容
*/
public static void sendMessage(Long sessionKey, String message) {
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
sendMessage(session, message);
}
/**
* 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息
*
* @param consumer 处理WebSocket消息的消费者函数
*/
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
subscribers.computeIfAbsent(WEB_SOCKET_TOPIC, k -> new CopyOnWriteArrayList<>()).add(consumer);
// RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
}
/**
* 发布WebSocket订阅消息
*
* @param webSocketMessage 要发布的WebSocket消息对象
*/
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
List<Long> unsentSessionKeys = new ArrayList<>();
// 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
continue;
}
unsentSessionKeys.add(sessionKey);
}
// 不在当前服务内session,发布订阅消息
if (CollUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(webSocketMessage.getMessage());
broadcastMessage.setSessionKeys(unsentSessionKeys);
// 模拟发布消息
CopyOnWriteArrayList<Consumer<WebSocketMessageDto>> topicSubscribers = subscribers.get(WEB_SOCKET_TOPIC);
if (topicSubscribers != null) {
topicSubscribers.forEach(consumer -> consumer.accept(broadcastMessage));
}
// RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
// log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
// WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
// });
}
}
/**
* 向所有的WebSocket会话发布订阅的消息(群发)
*
* @param message 要发布的消息内容
*/
public static void publishAll(String message) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(message);
// 模拟发布消息
CopyOnWriteArrayList<Consumer<WebSocketMessageDto>> topicSubscribers = subscribers.get(WEB_SOCKET_TOPIC);
if (topicSubscribers != null) {
topicSubscribers.forEach(consumer -> consumer.accept(broadcastMessage));
}
// RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
// log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
// });
}
/**
* 向指定的WebSocket会话发送Pong消息
*
* @param session 要发送Pong消息的WebSocket会话
*/
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
/**
* 向指定的WebSocket会话发送文本消息
*
* @param session WebSocket会话
* @param message 要发送的文本消息内容
*/
public static void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
/**
* 向指定的WebSocket会话发送WebSocket消息对象
*
* @param session WebSocket会话
* @param message 要发送的WebSocket消息对象
*/
private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
} else {
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
}
}
}
}
ReqSocketMessageVo.java
package com.beijing.websocket.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* @className: ReqSocketMessageVo
* @Description:
* @version: v1.0.0
* @author: Robert-L
* @date: 2025/3/12 18:19
*/
@Data
public class ReqSocketMessageVo {
/**
* header对象
*/
private SocketHeaderVo header;
/**
* payload对象
*/
private SocketPayload payload;
@Data
@AllArgsConstructor
public static class SocketHeaderVo {
private String namespace;
private String name;
}
@Data
public static class SocketPayload {
private String format = "pcm";
private Integer sample_rate = 16000;
private Integer max_sentence_silence = 800;
private Boolean enable_intermediate_result = true;
private Boolean enable_inverse_text_normalization = false;
private Boolean enable_punctuation_prediction = true;
private String lang_type;
private Float hotwords_weight = 0.8F;
private String forbidden_words_id = "";
private String hotwords_id = "";
private Boolean enable_modal_particle_filter = false;
private Boolean enable_words = false;
}
}
RespSocketMessageVo.class
package com.beijing.websocket.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* @className: ReqSocketMessageVo
* @Description:
* @version: v1.0.0
* @author: Robert-L
* @date: 2025/3/12 18:19
*/
@Data
public class RespSocketMessageVo {
/**
* header对象
*/
private SocketHeaderVo header;
/**
* payload对象
*/
private SocketPayload payload;
@Data
@AllArgsConstructor
public static class SocketHeaderVo {
private String namespace;
private String name;
/**
*
*/
private String appkey;
/**
* 状态码
*/
private String status;
/**
* 状态码说明
*/
private String status_text;
/**
* 任务全局唯一ID,请记录该值,便于排查问题。
*/
private String task_id;
/**
* 消息id
*/
private String message_id;
}
@Data
public static class SocketPayload {
private int index;
private int time;
private int begin_time;
private String speaker_id;
/**
* 识别后的结果
*/
private String result;
private int confidence;
private int volume;
private List<String> words = new ArrayList<>();
}
}
最后的最后,提供一个小demo
package com.beijing.web.controller.system;
import cn.hutool.json.JSONUtil;
import com.beijing.common.core.domain.AjaxResult;
import com.beijing.websocket.dto.WebSocketMessageDto;
import com.beijing.websocket.utils.WebSocketUtils;
import com.beijing.websocket.vo.ReqSocketMessageVo;
import com.beijing.websocket.vo.RespSocketMessageVo;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.concurrent.CompletableFuture;
/**
* WebSocket 演示案例
*/
@RequiredArgsConstructor
@RestController
@RequestMapping("/demo/websocket")
@Slf4j
public class WeSocketController {
private final String socketWbUrl = "ws://100.200.300.1:8888/ws/v2";
/**
* 发布消息
*
* @param dto 发送内容
*/
@GetMapping("/send")
public AjaxResult send(WebSocketMessageDto dto) throws InterruptedException {
WebSocketUtils.publishMessage(dto);
return AjaxResult.success("操作成功");
}
/**
* 上传pcm音频
*
* @param file 音频
*/
@GetMapping("/uploadPcm")
public AjaxResult uploadPcm(@RequestParam("file") MultipartFile file) throws Exception {
if (file.isEmpty()) {
return AjaxResult.error("文件为空");
}
// 获取 PCM 文件内容
byte[] pcmData = file.getBytes();
// 连接到目标 WebSocket 服务
StandardWebSocketClient client = new StandardWebSocketClient();
CompletableFuture<WebSocketSession> sessionFuture = client.execute(new AbstractWebSocketHandler() {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 发送启动语音识别指令
ReqSocketMessageVo req = new ReqSocketMessageVo();
req.setHeader(new ReqSocketMessageVo.SocketHeaderVo("SpeechRecognizer", "StartRecognition"));
ReqSocketMessageVo.SocketPayload socketPayload = new ReqSocketMessageVo.SocketPayload();
socketPayload.setLang_type("zh-cmn-Hans-CN");
req.setPayload(socketPayload);
ObjectMapper objectMapper = new ObjectMapper();
String jsonCommand = objectMapper.writeValueAsString(req);
session.sendMessage(new TextMessage(jsonCommand));
// 发送 PCM 数据
session.sendMessage(new BinaryMessage(pcmData));
log.info("PCM 数据已发送到目标服务");
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 接收识别结果
String recognitionResult = message.getPayload();
// log.info("接收到识别结果: {}", recognitionResult);
RespSocketMessageVo bean = JSONUtil.toBean(recognitionResult, RespSocketMessageVo.class);
if (null != bean) {
RespSocketMessageVo.SocketHeaderVo header = bean.getHeader();
RespSocketMessageVo.SocketPayload payload = bean.getPayload();
String taskId = header.getTask_id();
String namespace = header.getNamespace();
String name = header.getName();
String result = payload.getResult();
// 处理识别结果
log.info("识别后的文本: taskId:{}namespace:{}name:{}result:{}", taskId, namespace, name, result);
}
}
}, socketWbUrl);
// 等待连接完成
WebSocketSession session = sessionFuture.get();
return AjaxResult.success("操作成功");
}
}