Reactive响应式编程系列:解密reactor-netty如何实现响应式
我们都说 Netty 是一款基于异步事件驱动来设计和实现的高性能IO框架,它之所以高性能,重要的原因之一是其线程模型的设计,Netty 的线程模型是基于 Reactor 设计模式的,它主要包含两个线程池:一个是 Boss 线程池,另一个是 Worker 线程池。Boss 线程池主要负责接受客户端连接请求,并将连接请求注册到 Worker 线程池中的某个线程的 Selector 上。Boss 线程池通常只有一个线程(如果建链也成为瓶颈,那么Boss线程池也可以有多个)。Worker 线程池主要负责处理客户端连接请求,并进行网络 I/O 操作。Worker 线程池的大小通常是根据 CPU 核数和业务需求来调整的。
Reactor 设计模式最早由 Doug Schmidt 和 Steve Vinoski 在 1995 年的一篇论文《Reactor: An Object Behavioral Pattern for Concurrent Handling of Multiple Event-based Requests》中提出,这是一篇关于并发编程模式的经典论文。这篇论文对基于 Reactor 设计模式的网络编程框架进行了探讨。Reactor 设计模式是一种用于处理事件驱动 I/O 操作的设计模式,它主要包含以下两个核心组件:
1. Reactor:负责监听事件和分发事件,它将事件分发给对应的 Handler 处理器。
2. Handler:负责处理具体的事件,例如读取数据、解析数据、处理业务逻辑、发送响应数据等等。
Reactor 设计模式的基本思想是:当有事件发生时,Reactor 将事件分发给对应的 Handler 处理器,由 Handler 处理器来具体处理事件。在处理器处理事件的过程中,如果继续需要进行 I/O 操作,它会将 I/O 操作交给 Reactor 处理,由 Reactor 处理器负责监听 I/O 事件并分发给对应的 Handler 处理器。它可以提供高效的事件处理和 I/O 操作,避免了使用传统的同步阻塞式 I/O 模型的性能瓶颈。
Reactor 设计模式 和 Reactive 响应式编程有一定相似之处,两者的核心思想都是事件驱动,通过异步处理来解决高并发下阻塞操作带来的资源消耗和性能下降问题,而 reacotor-netty (GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty)的目标就是将 Netty 框架和响应式组件库 project-reactor (GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM)打通.
为了简单起见,我们用HTTP请求和应答处理过程来窥探reactor-netty的实现细节,在 HTTP 协议中,请求和响应消息分为两个部分:消息头和消息体。消息头包含了请求或响应的元数据信息,如请求方法、响应状态码、内容类型、内容长度等。而消息体则包含了请求或响应的实际内容。而当一个 HTTP 请求或响应结束时,需要发送一个空的消息体(EmptyContent)表示请求或响应结束了。因为 HTTP 协议是基于流的,在传输过程中,HTTP 消息是分多次发送的,每次发送都是一部分数据。当发送完最后一部分数据后,需要告诉接收方,请求或响应已经结束了。这个信号就是一个空的消息体。
注意,为了展示方便和理解更容易,文中展示的源代码都经过了精简,请以GitHub上最新代码为准!!!
有了这些基础,我们看一个 reactor-netty 的官方例子,官方给了一个HTTP的client和server的例子,我们这里只分析client端的例子:
String reqStr = "Go to Zibo for barbecue";
System.out.println(Thread.currentThread().getName() + " 开始请求 " + reqStr);
HttpClient httpClient = HttpClient.create().port(8888);
httpClient.post() // Specifies that POST method will be used
.uri("/test/World") // Specifies the path
.send(ByteBufFlux.fromString(Flux.just(reqStr))) // Sends the request body
.responseContent() // Receives the response body
.aggregate()
.asString()
.subscribe(res ->
System.out.println(Thread.currentThread().getName() + " 收到应答 " + res));
从这个client端例子来看,初步看起来和Reactive有关的有两个地方,第一个是send方法入参貌似是一个Flux,另一个是订阅方法subscribe,其中我们用Lambda表达式直接把HTTP的应答结果打印出来,需要注意的是,我们特意也打印了线程的名称,这是为了突出展示Reactive的异步事件驱动特性,即发起请求的线程和接收应答的线程不是同一个线程。这里我们不急于展示server端的例子,先剖析下client端的实现原理。client端有两个阶段会有IO阻塞,一个是请求发送,一个等待请求应答,请求发送很明显在上述的 send 方法中来做,请求的应答目前看起来像是在responseContent 方法中来实现。
我们先来详细看看 send 方法,按照 Reactive 的要求,调用 send 方法不会导致请求立马发送,请求的发送时机一定是 subscribe 方法调用时!!!而为了让请求发送的异步性做的更彻底,我们需要把请求的构造和包装也做成异步,所以你会看到调用了 ByteBufFlux.fromString 方法,该方法返回值是一个 Flux 类型(即 ByteBufFlux)来作为一个请求数据缓冲区的发布者对象。我们看下 send 方法的内部实现:
final class HttpClientFinalizer extends HttpClientConnect implements HttpClient.RequestSender {
@Override
public HttpClientFinalizer send(Publisher<? extends ByteBuf> requestBody) {
Objects.requireNonNull(requestBody, "requestBody");
// 使用Lambda构造匿名内部类来封装请求发送
return send((req, out) -> out.send(requestBody));
}
@Override
public HttpClientFinalizer send(
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> sender) {
Objects.requireNonNull(sender, "requestBody");
HttpClient dup = duplicate();
// 将请求发送方法保存起来
dup.configuration().body = sender;
return (HttpClientFinalizer) dup;
}
}
public final class HttpClientConfig extends ClientTransportConfig<HttpClientConfig> {
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> body;
}
可以看出来最终我们用 dup.configuration().body 保存了请求发送的Lambda对象(即匿名内部类,为了阐述方便,后面直接说是Lambda对象),而dup是一个 HttpClient 类型的对象,即我们给出的 client 官方例子中的流程发起类。send 方法就这么简单?直接复制一个 HttpClient 对象并将其 HttpClientConfig 的 body 属性赋值请求发送的匿名内部类?别忘记 Reactive 的核心之一就是异步事件驱动,在这个case中建立TCP连接,发送请求、等待应答这些会阻塞的操作,都由Netty框架来驱动,reactor-netty 负责将其和 reactor (即project-reactor)组件打通。那么问题来了,send 方法入参的Lamdba对象只进行了赋值,那 “out.send(requestBody)” 什么时候被调用呢?如果是当前线程来执行,那么就失去异步的特性了,所以肯定还是 Netty 来事件驱动,这里就不得不提 ChannelOperationsHandler 这个类,由于它继承自 Netty 的 ChannelInboundHandlerAdapter,所以其实现的 channelActive 方法最终会触发 HttpClientConnect.HttpIOHandlerObserver.onStateChange 方法的调用,这意味着一有链接建立成功,就会触发HTTP请求的发送,该方法最终会触发HttpClientHandler.requestWithBody 方法的调用,而这个 HttpClientHandler 对象中就包含了这个Lambda表达式对象,所以 reactor-netty 就通过 ChannelOperationsHandler 和 Netty 的事件循环打通了。
接着我们看看 send 方法后面调用的 responseContent 方法,我们看看其实现:
final class HttpClientFinalizer extends HttpClientConnect implements HttpClient.RequestSender {
final HttpClientConfig config;
static final Function<ChannelOperations<?, ?>, Publisher<ByteBuf>> contentReceiver = ChannelOperations::receive;
@Override
public ByteBufFlux responseContent() {
ByteBufAllocator alloc = (ByteBufAllocator) config.options()
.get(ChannelOption.ALLOCATOR);
if (alloc == null) {
alloc = ByteBufAllocator.DEFAULT;
}
@SuppressWarnings("unchecked")
Mono<ChannelOperations<?, ?>> connector = (Mono<ChannelOperations<?, ?>>) connect();
return ByteBufFlux.fromInbound(connector.flatMapMany(contentReceiver), alloc);
}
}
class HttpClientConnect extends HttpClient {
@Override
protected Mono<? extends Connection> connect() {
HttpClientConfig config = configuration();
return new MonoHttpConnect(config);
}
}
public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void>, ChannelOperationsId {
final FluxReceive inbound;
// 异步接收数据并返回一个缓冲区的Flux
@Override
public ByteBufFlux receive() {
// fromInbound 用来将一个数据的发布者和一块缓冲区来封装成一个ByteBufFlux
return ByteBufFlux.fromInbound(inbound, connection.channel()
.alloc());
}
}
可以看到,这里使用了 Netty 中的 ByteBufAllocator,它是用于分配 ByteBuf 对象的工具,而ByteBuf 是 Netty 中的字节缓冲区,用于存储和操作二进制数据。接着我们发现调用了 connect 方法来获取一个通道操作的 Mono,那么很显然这个 connect 方法并没有立马去创建链接,直接返回的是一个 MonoHttpConnect 对象,所以链接的建立过程肯定是在 MonoHttpConnect 中的subscribe方法来实现,感兴趣可以去看看。responseContent 方法最复杂的部分在 return 语句里,该方法最终返回的是 ByteBufFlux 类型对象,ByteBufFlux 其实就是把 Netty 中的 ByteBuf 进行了异步封装,封装成了 Flux<ByteBuf>。也就是说 responseContent 方法把建链和接收数据的缓冲区都做了异步封装并返回。之前我们说到在链接建立成功后就会发送HTTP请求,那么我们如何知道应答数据已经准备好了?还是得看 ChannelOperationsHandler,我们看其 channelRead 方法:
final class ChannelOperationsHandler extends ChannelInboundHandlerAdapter {
@Override
final public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
return;
}
try {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
// 如果是 HTTP 则 ops 是 HttpClientOperations 对象
ops.onInboundNext(ctx, msg);
}
} catch (Throwable err) {
safeRelease(msg);
ctx.close();
exceptionCaught(ctx, err);
}
}
}
该方法会从Channel中取出通道操作对象(如果是HTTP则对应的是 HttpClientOperations,它是 ChannelOperations 的子类之一)并调用其 onInboundNext 方法,该方法将 Netty 输入的 Object msg 传递到 reactor-netty 的 FluxReceive 对象的 onInboundNext 方法,从这里开始进入了响应式阶段:
final class FluxReceive extends Flux<Object> implements Subscription, Disposable {
CoreSubscriber<? super Object> receiver;
Queue<Object> receiverQueue;
final void onInboundNext(Object msg) {
if (receiverFastpath && receiver != null) {
try {
receiver.onNext(msg);
} finally {
ReferenceCountUtil.release(msg);
}
} else {
Queue<Object> q = receiverQueue;
if (q == null) {
// please note, in that case we are using non-thread safe, simple
// ArrayDeque since all modifications on this queue happens withing
// Netty Event Loop
q = new ArrayDeque<>();
receiverQueue = q;
}
q.offer(msg);
drainReceiver();
}
}
}
我们需要知道的是,在 Netty 中 HTTP 请求和响应都是由一系列的 DefaultHttpRequest 和 DefaultHttpResponse 对象组成的,其中 DefaultHttpContent 对象就是 HTTP 请求和响应的内容部分。当接收到 HTTP 请求时,Netty 会将请求头和请求体分开处理,其中请求体就是由一系列的 DefaultHttpContent 组成的。同样地,当发送 HTTP 响应时,Netty 会将响应头和响应体分开处理,其中响应体也是由一系列的 DefaultHttpContent 组成的。当 channelRead 接收到的 msg 为 DefaultHttpResponse 类型时,表明收到了应答的HEADER部分,接着当 channelRead 接收到的 msg 为 DefaultHttpContent 类型时,表明收到了应答体内容,最后当 channelRead 接收到的 msg 为 EmptyLastHttpResponse 类型时,表明该HTTP应答完整结束。有了这一基础知识后,reactor-netty 为了管理链接的生命周期,定义了 ConnectionObserver 这一接口,该接口是一个函数接口,需要被实现的是其 onStateChange 方法:
@FunctionalInterface
public interface ConnectionObserver {
/**
* React on connection state change (e.g. http request or response)
*
* @param connection the connection reference
* @param newState the new State
*/
void onStateChange(Connection connection, State newState);
}
它接收一个链接对象和一个新的状态,来承载链接状态变更,那么链接有哪些状态呢?
ConnectionObserver,其内部的State接口定义了链接的各种状态:
interface State {
/**
* Propagated when a connection has been established and is available
* 连接建立
*/
State CONNECTED = ReactorNetty.CONNECTED;
/**
* Propagated when a connection is bound to a channelOperation and ready for
* user interaction
* 已配置
*/
State CONFIGURED = ReactorNetty.CONFIGURED;
/**
* Propagated when a connection has been reused / acquired
* (keep-alive or pooling)
* 获得
*/
State ACQUIRED = ReactorNetty.ACQUIRED;
/**
* Propagated when a connection has been released but not fully closed
* (keep-alive or pooling)
* 已释放
*/
State RELEASED = ReactorNetty.RELEASED;
/**
* Propagated when a connection is being fully closed
* 断开连接
*/
State DISCONNECTING = ReactorNetty.DISCONNECTING;
}
我们再回到 channelRead 方法中 "ops.onInboundNext(ctx, msg);" 部分,前面说过 ops 其实是HttpClientOperations 类型的对象,而 HttpClientOperations.onInboundNext 方法中将通过 msg 消息类型来判断链接的生命周期中所处的状态,整体过程比较复杂,这里不直接给出源码。
接着是 aggregate 方法,从方法名来看就是做应答数据聚合使用的,它主要是从 ByteBufAllocator 取并发射数据,由于需要做成异步的,所以内部使用了Mono.defer 来实现,需要注意的时,由于该方法需要聚合数据,所以用的是 doOnNext 方法,即在数据流真正调用onNext 发射之前调用,该 doOnNext 方法将在 channelRead 方法中收到 EmptyLastHttpResponse 类型的 msg 来触发,这里暂时不给出源码。
后面是 asString 方法根据默认的字符集来把缓冲区的数据变换为字符串。
最后的 subscribe 方法我们都比较熟悉了,传入的 Lambda 表达式我们把应答结果打印出来。
以上就是整个消息发送和接收应答过程,我们可以看到,一旦 subscribe 方法调用,整个建立链接、发送请求、接收应答、聚合应答数据 都是由另一个线程(reactor-http-nio-*)来完成,相当于主线程再触发完 subscribe 方法后就可以做其他事情去了,而不用等待应答,但需要注意的是,subscribe 中的Lambda表达式(即处理HTTP应答的业务逻辑)也由 reactor-http-nio-* 线程执行,请勿处理高开销操作。