记录一次gRpc流式操作
使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)
gRpc协议类定义
message AdMsgProto{
optional string msg=1;
optional string tag=2;
optional string topic=3;
}
2. service方法定义
service MQDataService{
rpc sendRedissonMsg(AdMsgProto)returns (Code);
rpc receiveRedissonMsg(String)returns (stream AdMsgProto);
}
服务端写法
package com.mykkhw.mykkhw_data_mq.service.grpc;
import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;
public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {
@Override
public void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {
RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());
ResultProto.Builder builder = ResultProto.newBuilder();
builder.setCode(ResultType.SUCCESS);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override
public void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {
try {
RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());
// 循环处理消息
while (!Thread.currentThread().isInterrupted()) {
// 阻塞式获取消息,没有消息时线程会等待
String message = queue.take();
if(StringUtils.hasText(message)){
AdMsgProto.Builder builder = AdMsgProto.newBuilder();
builder.setMsg(message);
...
responseObserver.onNext(builder.build());
}
}
} catch (Exception e) {
Thread.currentThread().interrupt(); // 重新设置线程的中断标志
}
responseObserver.onCompleted();
}
}
//mq依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
<scope>compile</scope>
</dependency>
客户端写法
// 消息生产者
private void buildAppMSg(...param) {
...
// 发送mq消息
ClientManager.getMqDataServiceFutureStub().sendRedissonMsg(adMsg.build());
}
//消费者流式接收
public static void receiveFacebookMsg() {
try {
log.info("facebook msg");
// 处理服务器流式响应
StreamObserver<AdMsgProto> responseObserver = new StreamObserver<AdMsgProto>() {
@Override
public void onNext(AdMsgProto msgProto) {
log.info("facebook 接收到消息: {}", msgProto.getMsg());
...
}
@Override
public void onError(Throwable throwable) {
log.info("Error occurred: {}", throwable.getMessage());
...
}
@Override
public void onCompleted() {
log.info("Stream completed.");
...
}
};
log.info("接收msg 开始");
ClientManager.getMqDataServiceStub().receiveRedissonMsg(build, responseObserver);
log.info("接收msg 成功");
}catch (Exception e){
log.info("出错了");
}
}
~~~
jedis和消息优化版: