Netty对接阿里云语音识别和录音识别
阿里云实时语音识别:https://help.aliyun.com/document_detail/84430.html?spm=a2c4g.324262.0.0.564f73e9O6yq25
阿里云录音识别:https://help.aliyun.com/document_detail/90727.html?spm=a2c4g.90726.0.0.662d73e9qr8DqE
语音识别的流程为:前端和后端构建websocket连接,然后传二进制音频流给后端,后端拿到音频流,后阿里云构建websocket连接,转发音频流,阿里云收到后进行翻译,再返回给后端,后端再返回给前端
录音识别流程为:前端上传一段录音到阿里云oss上,返回录音的url,然后调用阿里云的录音识别拿到录音并解析,将结果返回给后端,后端再将结果返回给前端
实现
pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.18</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.aliyun</groupId>-->
<!-- <artifactId>aliyun-java-sdk-core</artifactId>-->
<!-- <version>3.7.1</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--阿里云语音识别-->
<dependency>
<groupId>com.alibaba.nls</groupId>
<artifactId>nls-sdk-transcriber</artifactId>
<version>2.2.1</version>
</dependency>
<!--swagger3-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
<!-- 阿里云OSS -->
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.15.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springblade</groupId>-->
<!-- <artifactId>blade-core-boot</artifactId>-->
<!-- <version>3.0.1.RELEASE</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.springblade</groupId>-->
<!-- <artifactId>blade-core-cloud</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!--redis客户端-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- lettuce连接池 -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
</dependencies>
ApplicationService 启动时构建阿里云的nls连接,并获取token放入redis中
@Slf4j
@Service
public class ApplicationService implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
RedisTemplate<String, String> redisTemplate;
public static ApplicationService application = null;
@Value("${aliyun.nls.accessKeyId}")
private String accessKeyId;
@Value("${aliyun.nls.accessKeySecret}")
private String accessKeySecret;
@Value("${aliyun.nls.url}")
private String url;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
//启动客户端
if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
log.info("客户端启动-------------------------->");
synchronized (this) {
ApplicationService.application = this;
new NlsClientService(accessKeyId, accessKeySecret, url);
log.info("阿里云 NlsClient 初始化完毕");
AccessToken accessToken = NlsClientService.getAccessToken();
redisTemplate.opsForValue().set("nlp:token", accessToken.getToken(), accessToken.getExpireTime(), TimeUnit.SECONDS);
}
}
}
}
构建nls和获取token的具体实现
@Slf4j
public class NlsClientService {
private static NlsClient client;
private static AccessToken accessToken;
public NlsClientService(String accessKeyId, String accessKeySecret, String url) {
//创建NlsClient实例,应用全局创建一个即可,生命周期可和整个应用保持一致,默认服务地址为阿里云线上服务地址
applyAccessToken(accessKeyId, accessKeySecret);
if (url.isEmpty()) {
client = new NlsClient(accessToken.getToken());
} else {
client = new NlsClient(url, accessToken.getToken());
}
}
public static AccessToken getAccessToken() {
return accessToken;
}
public static void applyAccessToken(String accessKeyId, String accessKeySecret) {
accessToken = new AccessToken(accessKeyId, accessKeySecret);
try {
accessToken.apply();
log.info("get token: " + accessToken.getToken() + ", expire time: " + accessToken.getExpireTime());
} catch (IOException e) {
log.error("https获取accessToken失败!" + e.getMessage());
}
}
public static NlsClient getNlsClient() {
return client;
}
}
netty服务端
@Slf4j
@Configuration
public class NettyWebSocketServer {
public static final int WEB_SOCKET_PORT = 9000;
// 创建线程池执行器
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());
/**
* 启动 ws server
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}
public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//ws升级为wss
SslContext sslCtx = SslUtil.createSSLContext();
pipeline.addLast(sslCtx.newHandler(socketChannel.alloc()));
//10秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(10, 10, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}
}
语音识别handler
/**
* 自定义handler
*/
@Slf4j
@Component
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
private NlpService nlpService;
// 当web客户端连接后,触发该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.nlpService = getService();
}
private NlpService getService() {
return SpringUtil.getBean(NlpService.class);
}
/**
* netty断联
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SpeechTranscriber transcriber = (SpeechTranscriber) transcriberMap.get(ctx.channel());
if (Objects.nonNull(transcriber)) {
//阿里云netty10s断开,连接状态变为STATE_CLOSED
if (SpeechReqProtocol.State.STATE_CLOSED.toString() != transcriber.getState().toString()) {
transcriber.stop();
}
transcriberMap.remove(ctx.channel());
transcriber.close();
}
log.warn("{} 已经断开", ctx.channel());
}
/**
* 处理异常
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
ctx.channel().close();
}
/**
* 心跳检查
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 心跳检测超时事件
if (idleStateEvent.state() == IdleState.READER_IDLE) {
log.warn("{} 已经 10s 没有读到数据了,关闭连接", ctx.channel());
} else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.warn("{} 已经 10s 没有写出数据了,关闭连接", ctx.channel());
}
ctx.channel().close();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
if (msg instanceof BinaryWebSocketFrame) {
//读取音频二进制流
ByteBuf byteBuf = msg.content();
byte[] byteArray = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(byteArray);
byteBuf.release();
//调用阿里云语音翻译
nlpService.speechTranslation(ctx.channel(), byteArray);
}
}
}
音频流格式,3200B的字节数组
byte[] byteArray = {
-1, -2, -51, -4, 105, -7, 84, -9, -2, -11, -56, -12, -64, -13, 47, -13, -28, -13, 110, -12, 25, -11, 78, -10, -95, -9, -21, -7, -123, -4, 106, -2, -109, 0, 85, 3, -119, 5, -63, 6, -19, 7, -33, 9, -35, 10, 72, 11, -72, 11, -75, 11, 34, 11, 103, 10, 89, 9, 74, 7, -99, 5, 11, 4, -59, 1, -109, -1, 66, -2, -47, -4, 39, -5, -111, -6, -46, -7, 71, -7, -127, -7, 116, -7, -105, -7, 93, -6, 118, -5, -111, -4, 25, -2, -23, -1, -119, 1, 103, 3, -4, 4, 45, 6, 32, 7, -41, 7, 70, 8, -91, 8, 116, 8, -25, 7, -94, 7, -96, 6, 52, 5, -79, 3, -41, 1, -38, -1, 65, -2, 100, -4, 42, -6, 113, -8, 33, -9, -118, -11, 12, -12, 77, -13, 23, -13, 75, -13, -99, -13, 37, -12, 9, -11, 40, -10, -85, -9, 9, -7, 2, -5, 92, -3, 126, -1, -47, 1, -25, 3, -19, 5, 112, 7, -106, 8, -37, 9, -85, 10, 93, 11, -77, 11, 72, 11, 2, 11, 32, 10, -4, 8, -72, 7, -33, 5, 123, 4, -51, 2, -125, 0, 59, -2, 48, -4, -57, -6, -27, -8, 106, -9, -70, -10, 51, -10, -27, -11, -53, -11, -10, -11, -91, -10, -59, -9, -26, -8, -8, -7, -121, -5, -120, -3, 88, -1, -44, 0, -60, 2, -80, 4, 21, 6, 90, 7, 92, 8, 3, 9, -97, 9, 115, 9, -69, 8, 1, 8, 70, 7, 12, 6, 84, 4, -34, 2, 127, 1, -6, -1, -20, -3, 8, -4, -28, -6, -21, -7, 18, -7, 71, -8, 22, -8, 123, -8, -53, -8, 12, -7, -88, -7, 46, -5, -28, -4, 17, -2, -75, -1