当前位置: 首页 > article >正文

netty之Netty请求响应同步通信

前言


实现开发RPC框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们RPC框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。这里我们选择netty作为我们的socket框架,采用future方式进行通信。
在这里插入图片描述

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        Response msg = (Response) obj;
        String requestId = msg.getRequestId();
        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
        if (future != null) {
            future.setResponse(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

public class SyncWrite {

    public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {

        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (request == null) {
            throw new NullPointerException("request");
        }
        if (timeout <= 0) {
            throw new IllegalArgumentException("timeout <= 0");
        }

        String requestId = UUID.randomUUID().toString();
        request.setRequestId(requestId);

        WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
        SyncWriteMap.syncKey.put(request.getRequestId(), future);

        Response response = doWriteAndSync(channel, request, timeout, future);

        SyncWriteMap.syncKey.remove(request.getRequestId());
        return response;
    }

    private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {

        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                writeFuture.setWriteResult(future.isSuccess());
                writeFuture.setCause(future.cause());
                //失败移除
                if (!writeFuture.isWriteSuccess()) {
                    SyncWriteMap.syncKey.remove(writeFuture.requestId());
                }
            }
        });

        Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
        if (response == null) {
            if (writeFuture.isTimeout()) {
                throw new TimeoutException();
            } else {
                // write exception
                throw new Exception(writeFuture.cause());
            }
        }
        return response;
    }

}

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) {
        Request msg = (Request) obj;
        //反馈
        Response request = new Response();
        request.setRequestId(msg.getRequestId());
        request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{公众号:关注明哥}。");
        ctx.writeAndFlush(request);
        //释放
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

}

public class StartClient {

    private static ChannelFuture future;

    public static void main(String[] args) {
        System.out.println("hi 微信公众号:关注明哥");
        ClientSocket client = new ClientSocket();
        new Thread(client).start();

        while (true) {
            try {
                //获取future,线程有等待处理时间
                if (null == future) {
                    future = client.getFuture();
                    Thread.sleep(500);
                    continue;
                }
                //构建发送参数
                Request request = new Request();
                request.setResult("查询{关注明哥}用户信息");
                SyncWrite s = new SyncWrite();
                Response response = s.writeAndSync(future.channel(), request, 1000);
                System.out.println("调用结果:" + JSON.toJSON(response));
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}


public class StartServer {

    public static void main(String[] args) {
        new Thread(new ServerSocket()).start();
        System.out.println("server start done. {关注明哥,获取源码}");
    }

}

测试结果
启动StartServer
在这里插入图片描述
启动StartClient
在这里插入图片描述
好了到这里就结束了netty之Netty请求响应同步通信的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;


http://www.kler.cn/news/331828.html

相关文章:

  • mybatis-plus使用总结
  • YOLO11改进|注意力机制篇|引入注意力与卷积混合的ACmix
  • C语言 | Leetcode C语言题解之第455题分发饼干
  • 云原生数据库 PolarDB
  • 【AIGC】ChatGPT开发者必备:如何获取 OpenAI 的 API Key
  • 异常场景分析
  • 读数据湖仓06数据集成
  • 深度学习----------------------------编码器、解码器架构
  • 如何让服务器自动封禁低质量ip
  • 程序猿成长之路之设计模式篇——设计模式简介
  • C++——定义个一个结构体变量(包括年、月、日),编写程序,要求输入年、月、日,程序计算并输出该日在本年中是第几天。(提示:需要考虑闰年)
  • 酒店新科技,飞睿智能毫米波雷达人体存在感应器,智能照明创新节能新风尚
  • 掌握 C# 中的委托与事件机制
  • 微信小程序攻略:如何验证Token是否即将失效并自动刷新
  • 70.【C语言】动态内存管理(重点)(3)
  • 【Echarts】折线图和柱状图如何从后端动态获取数据?
  • C++实现单例模式
  • 面试速通宝典——9
  • CORE MVC 过滤器 (筛选器)《2》 TypeFilter、ServiceFilter
  • 科技展厅方案新视角:布局优化促进深度互动体验?