netty之Netty请求响应同步通信
前言
实现开发RPC框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们RPC框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。这里我们选择netty作为我们的socket框架,采用future方式进行通信。
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
Response msg = (Response) obj;
String requestId = msg.getRequestId();
SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
if (future != null) {
future.setResponse(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public class SyncWrite {
public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {
if (channel == null) {
throw new NullPointerException("channel");
}
if (request == null) {
throw new NullPointerException("request");
}
if (timeout <= 0) {
throw new IllegalArgumentException("timeout <= 0");
}
String requestId = UUID.randomUUID().toString();
request.setRequestId(requestId);
WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
SyncWriteMap.syncKey.put(request.getRequestId(), future);
Response response = doWriteAndSync(channel, request, timeout, future);
SyncWriteMap.syncKey.remove(request.getRequestId());
return response;
}
private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
writeFuture.setWriteResult(future.isSuccess());
writeFuture.setCause(future.cause());
//失败移除
if (!writeFuture.isWriteSuccess()) {
SyncWriteMap.syncKey.remove(writeFuture.requestId());
}
}
});
Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
if (response == null) {
if (writeFuture.isTimeout()) {
throw new TimeoutException();
} else {
// write exception
throw new Exception(writeFuture.cause());
}
}
return response;
}
}
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) {
Request msg = (Request) obj;
//反馈
Response request = new Response();
request.setRequestId(msg.getRequestId());
request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{公众号:关注明哥}。");
ctx.writeAndFlush(request);
//释放
ReferenceCountUtil.release(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
public class StartClient {
private static ChannelFuture future;
public static void main(String[] args) {
System.out.println("hi 微信公众号:关注明哥");
ClientSocket client = new ClientSocket();
new Thread(client).start();
while (true) {
try {
//获取future,线程有等待处理时间
if (null == future) {
future = client.getFuture();
Thread.sleep(500);
continue;
}
//构建发送参数
Request request = new Request();
request.setResult("查询{关注明哥}用户信息");
SyncWrite s = new SyncWrite();
Response response = s.writeAndSync(future.channel(), request, 1000);
System.out.println("调用结果:" + JSON.toJSON(response));
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class StartServer {
public static void main(String[] args) {
new Thread(new ServerSocket()).start();
System.out.println("server start done. {关注明哥,获取源码}");
}
}
测试结果
启动StartServer
启动StartClient
好了到这里就结束了netty之Netty请求响应同步通信的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;