gRPC源码解读 传输层数据处理流程
本篇文章主要介绍gRPC Client传输层的处理流程,如有疑问,欢迎指教。
gRPC版本: 1.54.0-dev
gRPC基于http2传输,传输层主要处理http2相关的内容。RFC7540制定了http2协议规范,因此,这部分代码的逻辑绝大部分是按照协议规范实现的。如初始化http2连接、维持心跳、读取/发送Http2 Frame,流量控制等等。
具体实现上采取读写分离,由两个go协程分别负责frame读取和写入, 简单说就是建立个TCP链接,然后起两个协程分别负责读写。此外,为了提升网络传输性能,gRPC-Go还实现了BDP(Bandwidth Delay Product)采样以及流控窗口自动扩容等等。
在具体说明之前,先介绍两个重要的对象,loopyWriter,controlBuffer。
loopyWriter 简称loopy,顾名思义,这个是循环写的东西。它内部维护一个controlBuffer用于接收各种控制信息,包括从读端接收到的各种控制Frame(Setting、WindowUpdate、GoAway等)、以及待发送的Data、Header Frame。此外还有一些用于维护内部状态的信息。写端循环读取controlBuffer处理,没有消息就阻塞等待。
type loopyWriter struct {
side side // client or server
cbuf *controlBuffer // 控制信息缓存
sendQuota uint32 // 链路发送流控窗口额度 每次发送DataFrame前会检查是否有额度发送
oiws uint32 // stream 发送流控窗口额度
// 对于client侧, 这里的stream已经发送了header frame
// 对于server侧, 这里的stream已经收到了header frame
estdStreams map[uint32]*outStream // 当前连接上已建立且尚未清除的stream
// activeStreams是个stream链表,每个stream都有发送额度并且有数据待发送,数据存在在stream自身的item列表中,如果是server侧,该列表可能还包含trailers数据(header frame)
activeStreams *outStreamList
framer *framer
hBuf *bytes.Buffer // The buffer for HPACK encoding.
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator // dbp 估算器 用于动态窗口更新
draining bool
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
controlBuffer 作用上相当于一个消息队列,生产者将消息追加到list链表中,同时检查是否有消费者在等待,如有则通过ch唤醒。
// control buffer 类似一个消息队列 主要用写端使用,业务发送消息也是先追加到这里
type controlBuffer struct {
ch chan struct{}
done <-chan struct{}
mu sync.Mutex
consumerWaiting bool // 如写端没有消息,则阻塞等待通知
list *itemList // 消息列表
err error
// 统计有多少个需要响应的frame,如超过一定数量,则暂停读端写入
transportResponseFrames int
trfChan atomic.Value // chan struct{}
}
下面两张流程图详细描述了读端和写端的处理流程。看上去流程复杂,其实就是处理不同的HTTP2 Frame而已,对于读端来说,最核心的就是处理Data Frame,收到data frame后会将之发送到对应stream的接收缓存等待读取。
对于写端而言,核心部分就是发送DataFrame和HeaderFrame,此外,为了减少系统调用,提高发送效率,如果有多个frame,会采取批量发送的方式发送。如果单次发送数量少于一定字节数,还会让出一轮CPU时间片等待更多数据。
下面看具体的代码,先从newHTTP2Client 新建http2client开始:
// 基于给定地址构建http2的客户端,如成功,返回的http2client可以读写消息了。
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
// 默认http,如传递证书相关参数,使用https
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()
// tls相关
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
// 创建tcp连接
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
}
return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
}
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
if err != nil {
conn.Close()
}
}(conn)
ctxMonitorDone := grpcsync.NewEvent()
newClientCtx, newClientDone := context.WithCancel(connectCtx)
defer func() {
newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
}()
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
}
conn.Close()
}
}(conn)
kp := opts.KeepaliveParams
if kp.Time == 0 { // 默认长连接
kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 { // 超时默认20秒
kp.Timeout = defaultClientKeepaliveTimeout
}
keepaliveEnabled := false
if kp.Time != infinity {
// 当数据包发出去后的等待时间超过用户设置的时间时,判定连接超时
if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
keepaliveEnabled = true
}
// 安全相关
var (
isSecure bool
authInfo credentials.AuthInfo
)
transportCreds := opts.TransportCredentials
perRPCCreds := opts.PerRPCCredentials
if b := opts.CredsBundle; b != nil {
if t := b.TransportCredentials(); t != nil {
transportCreds = t
}
if t := b.PerRPCCredentials(); t != nil {
perRPCCreds = append(perRPCCreds, t)
}
}
if transportCreds != nil {
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
for _, cd := range perRPCCreds {
if cd.RequireTransportSecurity() {
if ci, ok := authInfo.(interface {
GetCommonAuthInfo() credentials.CommonAuthInfo
}); ok {
secLevel := ci.GetCommonAuthInfo().SecurityLevel
if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
}
}
}
}
isSecure = true
if transportCreds.Info().SecurityProtocol == "tls" {
scheme = "https"
}
}
// 默认开启动态流控窗口 除非指定有效的流控窗口参数 包括stream level or conn level
dynamicWindow := true
icwz := int32(initialWindowSize) // initial window size 65535
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
maxHeaderListSize := defaultClientMaxHeaderListSize
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
}
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
address: addr, // 对端地址
conn: conn, // 底层TCP连接
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
// Frame 读写
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
// 输入流量窗口控制 用于控制对端发送速度
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
// 当前client上的stream
activeStreams: make(map[uint32]*Stream),
// 安全相关
isSecure: isSecure,
perRPCCreds: perRPCCreds,
// 保活参数
kp: kp,
statsHandlers: opts.StatsHandlers,
// 本地stream 初始化流控窗口大小
initialWindowSize: initialWindowSize,
// stream ID ,默认从1开始,每次新建stream都会自动+2。 http2 client streamID为奇数,server侧streamID为偶数。
nextID: 1,
// 最大流并发数
maxConcurrentStreams: defaultMaxStreamsClient,
// 可用stream额度
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
// metric相关
czData: new(channelzData),
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
}
t.ctx = peer.NewContext(t.ctx, t.getPeer())
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
} else if md := imetadata.Get(addr); md != nil {
t.md = md
}
// control frame buffer
t.controlBuf = newControlBuffer(t.ctxDone)
// 和上面的conn window一样,如果设置了stream flow window,则也不能使用动态窗口机制
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
dynamicWindow = false
}
// 动态窗口默认开启 这初始化bdp采样器
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
}
}
// 数据统计相关
for _, sh := range t.statsHandlers {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
sh.HandleConn(t.ctx, connBegin)
}
// metrics
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
return nil, err
}
// 保活 根据配置时间定时发送 ping frame
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
readerErrCh := make(chan error, 1)
// 读端 reader
go t.reader(readerErrCh)
defer func() {
if err == nil {
// 如果 server preface 读取异常则关闭连接
err = <-readerErrCh
}
if err != nil {
t.Close(err)
}
}()
// 发送 http/2 connection preface, http2协议规定client和server必须发送connection preface以作为最终的协议确认,对于client侧,包含一个固定字符串以及一个或多个setting frame,对于server侧,则为一个或多个setting frame。
n, err := t.conn.Write(clientPreface)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
return nil, err
}
if n != len(clientPreface) {
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, err
}
var ss []http2.Setting
// 和默认值不同则发一个setting frame通知对端
if t.initialWindowSize != defaultWindowSize {
ss = append(ss, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
}
// 如有自定义参数则发一个setting frame通知对端
if opts.MaxHeaderListSize != nil {
ss = append(ss, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *opts.MaxHeaderListSize,
})
}
// 继续发送 connection preface,即便设置为空也会发送一个空的setting,这是http2 connection preface要求
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
return nil, err
}
// 通知对端,conn流控窗口需要增大, 这个只能通过window-update帧来通知而不是通过setting帧,因为setting帧是用来修改stream initial window size
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, err
}
}
// metric 统计
t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
// 清空发送缓存
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// 写端,循环处理写事件,如心跳、控制消息、业务数据等
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
// 关闭套接字
t.conn.Close()
// 通知controlBuf准备结束
t.controlBuf.finish()
// 关闭写信号
close(t.writerDone)
}()
return t, nil
}
newHTTP2Client 主要就是新建TCP套接字,然后按照http2协议规范初始化http2连接,然后起2个go 协程分别负责套接字的读写。
下面看看读端的代码:
// 验证server侧 connection preface,然后开始循环读数据
func (t *http2Client) reader(errCh chan<- error) {
// 退出前关闭读 此时套接字还能继续写
defer close(t.readerDone)
// 读取 server 侧connection preface(由SettingFrame构成)
if err := t.readServerPreface(); err != nil {
errCh <- err
return
}
close(errCh)
// 更新读取时间 keepalive保活协程会用到
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
// 循环读取消息
for {
// 检查写端是否有大量响应消息需要发送,如有,则等待
t.controlBuf.throttle()
// 接收http2 frame
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
// 更新读取时间 keepalive 保活协程会用到
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
// 如果是stream相关错误,则关闭对应stream,否则关闭整个链接
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
code := http2ErrConvTab[se.Code]
errorDetail := t.framer.fr.ErrorDetail()
var msg string
if errorDetail != nil {
msg = errorDetail.Error()
} else {
msg = "received invalid frame"
}
// 关掉对应的stream
t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
}
continue
} else {
// 关闭整个连接
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
// 根据frame 类型调用各自处理函数
switch frame := frame.(type) {
// Header帧
case *http2.MetaHeadersFrame:
t.operateHeaders(frame)
// Data帧
case *http2.DataFrame:
t.handleData(frame)
// RstStream帧
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
// Setting帧
case *http2.SettingsFrame:
t.handleSettings(frame, false)
// Ping帧
case *http2.PingFrame:
t.handlePing(frame)
// 链接断开帧
case *http2.GoAwayFrame:
t.handleGoAway(frame)
// 窗口更新帧
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
}
}
}
}
读端代码逻辑简单,就是验证过 connection preface 之后,循环读取frame处理。preface中文意思是开场白、序言。http2 协议规定,在正式进行数据交换前,client和server必须先进行connection preface以进行最终的协议确认,原文是这样:
In HTTP/2, each endpoint is required to send a connection preface as a final confirmation of the protocol in use and to establish the initial settings for the HTTP/2 connection. The client and server each send a different connection preface.
在HTTP/2中,每个端点都需要发送一个连接序言,作为使用协议的最终确认,并为HTTP/2连接建立初始设置。客户端和服务器各自发送一个不同的连接序言。
下面在看写端的代码,也是一个循环:
// 从controlBuf读取消息并处理,包括更新loopy自身状态或者发送http2 frame。loopy 将所有需要发送数据的stream放到一个active链表中,active链表中的每个stream必须满足两个条件,1,有数据发送。2,不受stream 流控限制。在运行循环的每次迭代中,除了处理传入的控制帧之外,循环调用processData,处理activeStreams链表中的stream上的发送消息队列,每次发送一个data frame或者一个data frame加上一个header frame,如还有消息,则继续加入activeStreams中等待下次处理
func (l *loopyWriter) run() (err error) {
// 退出之前清空下发送缓存
defer l.framer.writer.Flush()
for {
// 阻塞读消息(消息包括setting、header、data、ping、goaway frame)
it, err := l.cbuf.get(true)
if err != nil {
return err
}
// 消息处理
if err = l.handle(it); err != nil {
return err
}
// 发送data frame
if _, err = l.processData(); err != nil {
return err
}
// 是否让出CPU,只会让一次
gosched := true
hasdata:
for {
// 非阻塞读消息
it, err := l.cbuf.get(false)
if err != nil {
return err
}
if it != nil {
// 存在消息则循环处理
if err = l.handle(it); err != nil {
return err
}
if _, err = l.processData(); err != nil {
return err
}
continue hasdata
}
isEmpty, err := l.processData()
if err != nil {
return err
}
if !isEmpty { // 还有data待处理 则继续
continue hasdata
}
if gosched {
gosched = false
// 批量写
if l.framer.writer.offset < minBatchSize {
runtime.Gosched()
continue hasdata
}
}
l.framer.writer.Flush()
break hasdata
}
}
}
写端逻辑也很简洁,就是循环读取controlBuf,轮训处理有消息发送的stream。当有stream要发送数据时,则将data写入到controlBuf待处理。
以上就是gRPC传输层处理流程介绍。对于上层来说,建立好了http2Client就可以收发消息了,至于流控等是无需关心的。
整个流程除了业务数据的接收和发送之外,比较值得注意的是流量控制这一块的处理,关于gRPC流程控制,已经有一篇极好的文章详细介绍了。这里我稍稍补充下关于BDP和流控窗口临时增加这一部分。
BDP(Bandwidth Delay Product 带宽延迟积) 用来衡量网络链路中可以发送多少比特数(或字节数)。它给出了发送者在任意时间在未接收到接收端确认数据之前最多可以发送的数据量。如果想最大化利用网络传输效能,接收端的接收窗口必须要大于bdp。因为如果小于bdp,则无法充分利用网络链路传输效能。
gRPC通过发送bdpping以及收到bdpping的响应来计算RTT,并统计在此期间收到的数据量sample来估算bdp,以此来调整流控控制窗口从而提升网络传输性能。实现细节以及理论细节可以参考贴出的参考资料。
流控窗口临时增加是指当业务程序要求读取的数据超过当前流控窗口时,正常情况下,发送端会分多个frame多次发送。为了提升发送性能,接收端此时会发送一个windowUpdate窗口指示发送端可以发送更多数据从而绕过流控窗口的限制。这个优化在高延迟网络下可以提升10倍以上的性能。
参考资料:
1. gRPC 流量控制详解 - 掘金
2. gRPC性能优化(BDP & 流控窗口临时增加)
3. 再谈 gRPC 的 Trailers 设计
4. grpc/PROTOCOL-HTTP2.md at master · grpc/grpc · GitHub
5. http2 接收窗口自动调整
6. RFC 7540 - Hypertext Transfer Protocol Version 2 (HTTP/2)