netty中Future和ChannelHandler
netty中的Future,继承自 jdk中的Future,, jdk中的Future,很垃圾,只能同步阻塞获取结果,,,
netty中的Future进行了升级,,可以addListener()
异步获取结果,,可以isSuccess()
判断任务成功还是失败,,
- jdk的Future
- get()
- isDone()
- cancel() : 取消当前任务
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> future = service.submit(() -> {
log.debug("running...");
Thread.sleep(2000);
return 2;
});
Integer i = future.get();
log.debug("i = " + i);
}
- netty中的Future
- isSuccess() : 判断任务是否成功
- sync() : 同步等待,,任务不成功会抛错
- getNow() : 获取结果,没有就返回null
- await() : 同步等待,,任务不成功不会报错,,后面通过isSuccess()判断是否成功
- addListener() : 任务结束回调
public static void main(String[] args) {
// netty中的线程池 eventLoop,, eventloop中就一个线程
NioEventLoopGroup group = new NioEventLoopGroup(2);
EventLoop eventLoop = group.next();
Future<String> future = eventLoop.submit(() -> {
Thread.sleep(2000);
return "hehe";
});
String now = future.getNow();
System.out.println("now = " + now);
boolean success = future.isSuccess();
System.out.println("success = " + success);
future.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
Object now1 = future.getNow();
System.out.println("now1 = " + now1);
boolean success = future.isSuccess();
System.out.println("success = " + success);
}
});
}
- netty中的Promise
继承自netty的Future,
Promise可以设置成功和失败,,不用等到任务结束
public static void main(String[] args) throws ExecutionException, InterruptedException {
EventLoopGroup group = new NioEventLoopGroup(2);
EventLoop eventLoop = group.next();
// 主动创建promise ===> 结果的容器,
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
System.out.println("开始计算");
try {
int i = 1/0;
Thread.sleep(1000);
promise.setSuccess(1000);
} catch (InterruptedException e) {
e.printStackTrace();
promise.setFailure(e);
}
//
}).start();
Integer i = promise.get();
System.out.println("i = " + i);
}
ChannelHandler
netty中handler分为两类:
- ChannelInboundHandler : 入站,, 读取数据,,,channel按照添加顺序依次执行
- ChannelOutboundHandler :出站 : 发送数据,,channel 逆序执行
channel.wirte() : 从末尾逆序执行
ctx.wirte() : 是从当前的handler,往前面找ChannelOutboundHandler执行
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加处理器 ,,, netty会自动添加两个handler,,一个叫head,,一个叫tail,,,
// 底层是 双向链表
pipeline.addLast("handle01",new ChannelInboundHandlerAdapter(){
// 入站的handler,,一般关心的 read
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg:{}",msg);
ByteBuf byteBuf = (ByteBuf) msg;
String s = byteBuf.toString(Charset.defaultCharset());
// 调用下一个handle ctx.fireChannelRead(msg);,,并且将处理完成的结果,传递给下一个handler
super.channelRead(ctx, s);
}
});
pipeline.addLast("handle02",new ChannelInboundHandlerAdapter(){
// 入站的handler,,一般关心的 read
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg222:{}",msg);
User user = new User();
user.setName(((String) msg));
super.channelRead(ctx, user);
}
});
pipeline.addLast("handle03",new ChannelInboundHandlerAdapter(){
// 入站的handler,,一般关心的 read
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg333:{}",msg);
super.channelRead(ctx, msg);
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
}
});
// 出站是,,从后面往前走 ,,只有有写出的时候,才会触发出站方法,,,,
pipeline.addLast("handle04",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("msg444:{}",msg);
super.write(ctx, msg, promise);
}
});
pipeline.addLast("handle05",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("msg555:{}",msg);
super.write(ctx, msg, promise);
}
});
}
}).bind(new InetSocketAddress(8080));
}
EmbeddedChannel 模拟channel执行
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg = " + msg);
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h2 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println(4444);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes()));
channel.writeOutbound(channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes())));
}