当前位置: 首页 > article >正文

记录一次gRpc流式操作

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

message AdMsgProto{
optional string msg=1;
optional string tag=2;
optional string topic=3;
}
2. service方法定义
service MQDataService{
rpc sendRedissonMsg(AdMsgProto)returns (Code);
rpc receiveRedissonMsg(String)returns (stream AdMsgProto);
}

服务端写法

package com.mykkhw.mykkhw_data_mq.service.grpc;

import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;

public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {
    @Override
    public void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {

        RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());
        ResultProto.Builder builder = ResultProto.newBuilder();
        builder.setCode(ResultType.SUCCESS);
        responseObserver.onNext(builder.build());
        responseObserver.onCompleted();
    }

    @Override
    public void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {

        try {
            RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());

            // 循环处理消息
            while (!Thread.currentThread().isInterrupted()) {
                // 阻塞式获取消息,没有消息时线程会等待
                String message = queue.take();

                if(StringUtils.hasText(message)){
                    AdMsgProto.Builder builder = AdMsgProto.newBuilder();
                    builder.setMsg(message);
                    ...
                    responseObserver.onNext(builder.build());
                }
            }
        } catch (Exception e) {
            Thread.currentThread().interrupt();  // 重新设置线程的中断标志
        }
        responseObserver.onCompleted();
    }
}

//mq依赖
<dependency>
   <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.0</version>
    <scope>compile</scope>
</dependency>

客户端写法

 // 消息生产者
    private void buildAppMSg(...param) {
		...
        // 发送mq消息
        ClientManager.getMqDataServiceFutureStub().sendRedissonMsg(adMsg.build());
    }


	//消费者流式接收
	public static void receiveFacebookMsg() {
        try {
            log.info("facebook msg");
            // 处理服务器流式响应
            StreamObserver<AdMsgProto> responseObserver = new StreamObserver<AdMsgProto>() {
                @Override
                public void onNext(AdMsgProto msgProto) {
                    log.info("facebook 接收到消息: {}", msgProto.getMsg());
                    ...
                }
                @Override
                public void onError(Throwable throwable) {
                    log.info("Error occurred: {}", throwable.getMessage());
                    ...
                }
                @Override
                public void onCompleted() {
                    log.info("Stream completed.");
                    ...
                }
            };
            log.info("接收msg 开始");
            ClientManager.getMqDataServiceStub().receiveRedissonMsg(build, responseObserver);
            log.info("接收msg 成功");
        }catch (Exception e){
            log.info("出错了");
        }
    }
    ~~~
        jedis和消息优化版:

http://www.kler.cn/news/327478.html

相关文章:

  • 正则表达式的使用示例--Everything文件检索批量重命名工具
  • 使用 Python 实现图形学的辐射度算法
  • Flask-2
  • Gpt4.0最新保姆级教程开通升级
  • 如何使用 Python 读取数据量庞大的 excel 文件
  • PostgreSQL+MybatisPlus,设置逻辑删除字段后查询出现:操作符不存在: boolean = integer 错误
  • 【mmengine】配置器(config)(进阶)继承与导出,命令行修改配置
  • 鸿蒙开发(NEXT/API 12)【硬件(常见问题)】驱动开发服务
  • 3-3 AUTOSAR RTE 对SR Port的作用
  • 51单片机的智能停车场【proteus仿真+程序+报告+原理图+演示视频】
  • Ubuntu 手动安装 ollama
  • 音视频入门基础:FLV专题(9)——Script Tag简介
  • mysql迁移postgreSql windows 工具
  • SQL SERVER 从嫌弃存储到爱上存储过程我给存储过程开发了版本控制工具和远程调试功能...
  • 基于ESP8266使用OLED显示温湿度和时间
  • Jmeter常用函数、逻辑控制器
  • 025.Oracle_DBMS_job定时任务
  • 单片机在控制和自动化任务中的应用场景广泛
  • 关于区块链的安全和隐私
  • 国外问卷调查匠哥已经不带人了,但是还可以交流
  • Windows平台如何实现RTSP|RTMP流录像?
  • STM32F1+HAL库+FreeTOTS学习14——数值信号量
  • 如何创建一个包含多个列的表?
  • PHP的guzzlehttp/guzzle库在碰到各种异常时的场景
  • 前端框架对比和选择:Vue、React 和 Angular 谁更适合你的项目?
  • Java 死锁及避免讲解和案例示范
  • 初识Linux · O(1)调度算法
  • 新品 | Teledyne FLIR IIS 推出Forge 1GigE SWIR 短波红外工业相机系列
  • 【d57】【sql】1661. 每台机器的进程平均运行时间
  • 【AI学习笔记】基于Unity+DeepSeek开发的一些BUG记录解决方案