当前位置: 首页 > article >正文

gRPC源码解读 传输层数据处理流程

         本篇文章主要介绍gRPC Client传输层的处理流程,如有疑问,欢迎指教。

gRPC版本: 1.54.0-dev

      gRPC基于http2传输,传输层主要处理http2相关的内容。RFC7540制定了http2协议规范,因此,这部分代码的逻辑绝大部分是按照协议规范实现的。如初始化http2连接、维持心跳、读取/发送Http2 Frame,流量控制等等。

        具体实现上采取读写分离,由两个go协程分别负责frame读取和写入, 简单说就是建立个TCP链接,然后起两个协程分别负责读写。此外,为了提升网络传输性能,gRPC-Go还实现了BDP(Bandwidth Delay Product)采样以及流控窗口自动扩容等等。       

        在具体说明之前,先介绍两个重要的对象,loopyWriter,controlBuffer。

        loopyWriter 简称loopy,顾名思义,这个是循环写的东西。它内部维护一个controlBuffer用于接收各种控制信息,包括从读端接收到的各种控制Frame(Setting、WindowUpdate、GoAway等)、以及待发送的Data、Header Frame。此外还有一些用于维护内部状态的信息。写端循环读取controlBuffer处理,没有消息就阻塞等待。


type loopyWriter struct {
	side      side        // client or server 
	cbuf      *controlBuffer // 控制信息缓存
	sendQuota uint32 // 链路发送流控窗口额度 每次发送DataFrame前会检查是否有额度发送
	oiws      uint32 // stream 发送流控窗口额度 

	// 对于client侧, 这里的stream已经发送了header frame
	// 对于server侧, 这里的stream已经收到了header frame
	estdStreams map[uint32]*outStream // 当前连接上已建立且尚未清除的stream

    // activeStreams是个stream链表,每个stream都有发送额度并且有数据待发送,数据存在在stream自身的item列表中,如果是server侧,该列表可能还包含trailers数据(header frame)
	activeStreams *outStreamList
	framer        *framer
	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
	hEnc          *hpack.Encoder // HPACK encoder.
	bdpEst        *bdpEstimator  // dbp 估算器 用于动态窗口更新
	draining      bool          

	// Side-specific handlers
	ssGoAwayHandler func(*goAway) (bool, error)
}

controlBuffer 作用上相当于一个消息队列,生产者将消息追加到list链表中,同时检查是否有消费者在等待,如有则通过ch唤醒。

// control buffer 类似一个消息队列 主要用写端使用,业务发送消息也是先追加到这里
type controlBuffer struct {
	ch              chan struct{} 
	done            <-chan struct{}
	mu              sync.Mutex
	consumerWaiting bool      // 如写端没有消息,则阻塞等待通知 
	list            *itemList // 消息列表
	err             error

    // 统计有多少个需要响应的frame,如超过一定数量,则暂停读端写入
	transportResponseFrames int
	trfChan                 atomic.Value // chan struct{}
}

        下面两张流程图详细描述了读端和写端的处理流程。看上去流程复杂,其实就是处理不同的HTTP2 Frame而已,对于读端来说,最核心的就是处理Data Frame,收到data frame后会将之发送到对应stream的接收缓存等待读取。

        对于写端而言,核心部分就是发送DataFrame和HeaderFrame,此外,为了减少系统调用,提高发送效率,如果有多个frame,会采取批量发送的方式发送。如果单次发送数量少于一定字节数,还会让出一轮CPU时间片等待更多数据。

图一 http2Client reader 读端

 

图2 loopyWriter​​​​ 写端

         

下面看具体的代码,先从newHTTP2Client 新建http2client开始:

// 基于给定地址构建http2的客户端,如成功,返回的http2client可以读写消息了。
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
    // 默认http,如传递证书相关参数,使用https
	scheme := "http"
	ctx, cancel := context.WithCancel(ctx)
	defer func() {
		if err != nil {
			cancel()
		}
	}()


	// tls相关 
	connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})

	// 创建tcp连接
	conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
	if err != nil {
		if opts.FailOnNonTempDialError {
			return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
		}
		return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
	}

	// Any further errors will close the underlying connection
	defer func(conn net.Conn) {
		if err != nil {
			conn.Close()
		}
	}(conn)

	ctxMonitorDone := grpcsync.NewEvent()
	newClientCtx, newClientDone := context.WithCancel(connectCtx)
	defer func() {
		newClientDone()         // Awaken the goroutine below if connectCtx hasn't expired.
		<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
	}()
	go func(conn net.Conn) {
		defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
		<-newClientCtx.Done()       // Block until connectCtx expires or the defer above executes.
		if err := connectCtx.Err(); err != nil {
			// connectCtx expired before exiting the function.  Hard close the connection.
			if logger.V(logLevel) {
				logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
			}
			conn.Close()
		}
	}(conn)

	kp := opts.KeepaliveParams
	if kp.Time == 0 { // 默认长连接
		kp.Time = defaultClientKeepaliveTime
	}
	if kp.Timeout == 0 { // 超时默认20秒
		kp.Timeout = defaultClientKeepaliveTimeout
	}
	keepaliveEnabled := false
	if kp.Time != infinity {
		// 当数据包发出去后的等待时间超过用户设置的时间时,判定连接超时
		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
		}
		keepaliveEnabled = true
	}
	// 安全相关
	var (
		isSecure bool
		authInfo credentials.AuthInfo
	)
	transportCreds := opts.TransportCredentials
	perRPCCreds := opts.PerRPCCredentials
	if b := opts.CredsBundle; b != nil {
		if t := b.TransportCredentials(); t != nil {
			transportCreds = t
		}
		if t := b.PerRPCCredentials(); t != nil {
			perRPCCreds = append(perRPCCreds, t)
		}
	}
	if transportCreds != nil {
		conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
		if err != nil {
			return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
		}
		for _, cd := range perRPCCreds {
			if cd.RequireTransportSecurity() {
				if ci, ok := authInfo.(interface {
					GetCommonAuthInfo() credentials.CommonAuthInfo
				}); ok {
					secLevel := ci.GetCommonAuthInfo().SecurityLevel
					if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
						return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
					}
				}
			}
		}
		isSecure = true
		if transportCreds.Info().SecurityProtocol == "tls" {
			scheme = "https"
		}
	}

	// 默认开启动态流控窗口 除非指定有效的流控窗口参数 包括stream level or conn level
	dynamicWindow := true
	icwz := int32(initialWindowSize) // initial window size 65535
	if opts.InitialConnWindowSize >= defaultWindowSize {
		icwz = opts.InitialConnWindowSize
		dynamicWindow = false
	}

	writeBufSize := opts.WriteBufferSize
	readBufSize := opts.ReadBufferSize
	maxHeaderListSize := defaultClientMaxHeaderListSize
	if opts.MaxHeaderListSize != nil {
		maxHeaderListSize = *opts.MaxHeaderListSize
	}
	t := &http2Client{
		ctx:                   ctx,
		ctxDone:               ctx.Done(), // Cache Done chan.
		cancel:                cancel,
		userAgent:             opts.UserAgent,
		registeredCompressors: grpcutil.RegisteredCompressors(),
		address:               addr, // 对端地址
		conn:                  conn, // 底层TCP连接
		remoteAddr:            conn.RemoteAddr(),
		localAddr:             conn.LocalAddr(),
		authInfo:              authInfo,
		readerDone:            make(chan struct{}),
		writerDone:            make(chan struct{}),
		goAway:                make(chan struct{}),
		// Frame 读写
		framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
		// 输入流量窗口控制 用于控制对端发送速度
		fc:     &trInFlow{limit: uint32(icwz)},
		scheme: scheme,
		// 当前client上的stream
		activeStreams: make(map[uint32]*Stream),

        // 安全相关 
		isSecure:      isSecure,
		perRPCCreds:   perRPCCreds,
        // 保活参数
		kp:            kp,
		statsHandlers: opts.StatsHandlers,
        
 
		// 本地stream 初始化流控窗口大小
		initialWindowSize: initialWindowSize,
		// stream ID ,默认从1开始,每次新建stream都会自动+2。 http2 client streamID为奇数,server侧streamID为偶数。
		nextID: 1,
		// 最大流并发数
		maxConcurrentStreams: defaultMaxStreamsClient,
		// 可用stream额度
		streamQuota:           defaultMaxStreamsClient,
		streamsQuotaAvailable: make(chan struct{}, 1),

		// metric相关
		czData:           new(channelzData),
		keepaliveEnabled: keepaliveEnabled,
		bufferPool:       newBufferPool(),
		onClose:          onClose,
	}
	t.ctx = peer.NewContext(t.ctx, t.getPeer())
	if md, ok := addr.Metadata.(*metadata.MD); ok {
		t.md = *md
	} else if md := imetadata.Get(addr); md != nil {
		t.md = md
	}

	// control frame buffer
	t.controlBuf = newControlBuffer(t.ctxDone)

	// 和上面的conn window一样,如果设置了stream flow window,则也不能使用动态窗口机制
	if opts.InitialWindowSize >= defaultWindowSize {
		t.initialWindowSize = opts.InitialWindowSize
		dynamicWindow = false
	}

	// 动态窗口默认开启 这初始化bdp采样器
	if dynamicWindow {
		t.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: t.updateFlowControl,
		}
	}
	// 数据统计相关
	for _, sh := range t.statsHandlers {
		t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
			RemoteAddr: t.remoteAddr,
			LocalAddr:  t.localAddr,
		})
		connBegin := &stats.ConnBegin{
			Client: true,
		}
		sh.HandleConn(t.ctx, connBegin)
	}

	// metrics
	t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
	if err != nil {
		return nil, err
	}

	// 保活 根据配置时间定时发送 ping frame
	if t.keepaliveEnabled {
		t.kpDormancyCond = sync.NewCond(&t.mu)
		go t.keepalive()
	}

	readerErrCh := make(chan error, 1)

	// 读端 reader
	go t.reader(readerErrCh)
	defer func() {
		if err == nil {
			// 如果 server preface 读取异常则关闭连接
			err = <-readerErrCh
		}
		if err != nil {
			t.Close(err)
		}
	}()

	// 发送 http/2 connection preface, http2协议规定client和server必须发送connection preface以作为最终的协议确认,对于client侧,包含一个固定字符串以及一个或多个setting frame,对于server侧,则为一个或多个setting frame。
	n, err := t.conn.Write(clientPreface)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
		return nil, err
	}
	if n != len(clientPreface) {
		err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
		return nil, err
	}
	var ss []http2.Setting

	// 和默认值不同则发一个setting frame通知对端
	if t.initialWindowSize != defaultWindowSize {
		ss = append(ss, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32(t.initialWindowSize),
		})
	}
    // 如有自定义参数则发一个setting frame通知对端
	if opts.MaxHeaderListSize != nil {
		ss = append(ss, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *opts.MaxHeaderListSize,
		})
	}
	// 继续发送 connection preface,即便设置为空也会发送一个空的setting,这是http2 connection preface要求
	err = t.framer.fr.WriteSettings(ss...)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
		return nil, err
	}

	// 通知对端,conn流控窗口需要增大, 这个只能通过window-update帧来通知而不是通过setting帧,因为setting帧是用来修改stream initial window size
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
			err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
			return nil, err
		}
	}

	// metric 统计
	t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)

	// 清空发送缓存
	if err := t.framer.writer.Flush(); err != nil {
		return nil, err
	}

	// 写端,循环处理写事件,如心跳、控制消息、业务数据等
	go func() {
		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
		err := t.loopy.run()
		if logger.V(logLevel) {
			logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
		}
		// Do not close the transport.  Let reader goroutine handle it since
		// there might be data in the buffers.
		// 关闭套接字
		t.conn.Close()
		// 通知controlBuf准备结束
		t.controlBuf.finish()
		// 关闭写信号
		close(t.writerDone)
	}()
	return t, nil
}

newHTTP2Client 主要就是新建TCP套接字,然后按照http2协议规范初始化http2连接,然后起2个go 协程分别负责套接字的读写。

下面看看读端的代码:


// 验证server侧 connection preface,然后开始循环读数据
func (t *http2Client) reader(errCh chan<- error) {
    // 退出前关闭读 此时套接字还能继续写
	defer close(t.readerDone)

	// 读取 server 侧connection preface(由SettingFrame构成)
	if err := t.readServerPreface(); err != nil {
		errCh <- err
		return
	}
	close(errCh)

	// 更新读取时间 keepalive保活协程会用到
	if t.keepaliveEnabled {
		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
	}

	// 循环读取消息
	for {
        // 检查写端是否有大量响应消息需要发送,如有,则等待
		t.controlBuf.throttle()
        // 接收http2 frame
		frame, err := t.framer.fr.ReadFrame()
		if t.keepaliveEnabled {
			// 更新读取时间 keepalive 保活协程会用到
			atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
		}
		if err != nil {
            // 如果是stream相关错误,则关闭对应stream,否则关闭整个链接
			if se, ok := err.(http2.StreamError); ok {
				t.mu.Lock()
				s := t.activeStreams[se.StreamID]
				t.mu.Unlock()
				if s != nil {
					code := http2ErrConvTab[se.Code]
					errorDetail := t.framer.fr.ErrorDetail()
					var msg string
					if errorDetail != nil {
						msg = errorDetail.Error()
					} else {
						msg = "received invalid frame"
					}
					// 关掉对应的stream
					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
				}
				continue
			} else {
				// 关闭整个连接
				t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
				return
			}
		}
        // 根据frame 类型调用各自处理函数
		switch frame := frame.(type) {
		// Header帧
		case *http2.MetaHeadersFrame:
			t.operateHeaders(frame)
		// Data帧
		case *http2.DataFrame:
			t.handleData(frame)
		// RstStream帧
		case *http2.RSTStreamFrame:
			t.handleRSTStream(frame)
		// Setting帧
		case *http2.SettingsFrame:
			t.handleSettings(frame, false)
		// Ping帧
		case *http2.PingFrame:
			t.handlePing(frame)
		// 链接断开帧
		case *http2.GoAwayFrame:
			t.handleGoAway(frame)
		// 窗口更新帧
		case *http2.WindowUpdateFrame:
			t.handleWindowUpdate(frame)
		default:
			if logger.V(logLevel) {
				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
			}
		}
	}
}

读端代码逻辑简单,就是验证过 connection preface  之后,循环读取frame处理。preface中文意思是开场白、序言。http2 协议规定,在正式进行数据交换前,client和server必须先进行connection preface以进行最终的协议确认,原文是这样:

In HTTP/2, each endpoint is required to send a connection preface as a final confirmation of the protocol in use and to establish the initial settings for the HTTP/2 connection. The client and server each send a different connection preface.

在HTTP/2中,每个端点都需要发送一个连接序言,作为使用协议的最终确认,并为HTTP/2连接建立初始设置。客户端和服务器各自发送一个不同的连接序言。

下面在看写端的代码,也是一个循环:

// 从controlBuf读取消息并处理,包括更新loopy自身状态或者发送http2 frame。loopy 将所有需要发送数据的stream放到一个active链表中,active链表中的每个stream必须满足两个条件,1,有数据发送。2,不受stream 流控限制。在运行循环的每次迭代中,除了处理传入的控制帧之外,循环调用processData,处理activeStreams链表中的stream上的发送消息队列,每次发送一个data frame或者一个data frame加上一个header frame,如还有消息,则继续加入activeStreams中等待下次处理
func (l *loopyWriter) run() (err error) {
	// 退出之前清空下发送缓存
	defer l.framer.writer.Flush()
	for {
		// 阻塞读消息(消息包括setting、header、data、ping、goaway frame)
		it, err := l.cbuf.get(true)
		if err != nil {
			return err
		}
		// 消息处理
		if err = l.handle(it); err != nil {
			return err
		}
		// 发送data frame
		if _, err = l.processData(); err != nil {
			return err
		}
		// 是否让出CPU,只会让一次
		gosched := true
	hasdata:
		for {
			// 非阻塞读消息
			it, err := l.cbuf.get(false)
			if err != nil {
				return err
			}
			if it != nil {
				// 存在消息则循环处理
				if err = l.handle(it); err != nil {
					return err
				}
				if _, err = l.processData(); err != nil {
					return err
				}
				continue hasdata
			}

			isEmpty, err := l.processData()
			if err != nil {
				return err
			}
			if !isEmpty { // 还有data待处理 则继续
				continue hasdata
			}
			if gosched {
				gosched = false
				// 批量写
				if l.framer.writer.offset < minBatchSize {
					runtime.Gosched()
					continue hasdata
				}
			}
			l.framer.writer.Flush()
			break hasdata
		}
	}
}

写端逻辑也很简洁,就是循环读取controlBuf,轮训处理有消息发送的stream。当有stream要发送数据时,则将data写入到controlBuf待处理。

        以上就是gRPC传输层处理流程介绍。对于上层来说,建立好了http2Client就可以收发消息了,至于流控等是无需关心的。

        整个流程除了业务数据的接收和发送之外,比较值得注意的是流量控制这一块的处理,关于gRPC流程控制,已经有一篇极好的文章详细介绍了。这里我稍稍补充下关于BDP和流控窗口临时增加这一部分。

图3 BDP

 

        BDP(Bandwidth Delay Product 带宽延迟积) 用来衡量网络链路中可以发送多少比特数(或字节数)。它给出了发送者在任意时间在未接收到接收端确认数据之前最多可以发送的数据量。如果想最大化利用网络传输效能,接收端的接收窗口必须要大于bdp。因为如果小于bdp,则无法充分利用网络链路传输效能。        

        gRPC通过发送bdpping以及收到bdpping的响应来计算RTT,并统计在此期间收到的数据量sample来估算bdp,以此来调整流控控制窗口从而提升网络传输性能。实现细节以及理论细节可以参考贴出的参考资料。

        流控窗口临时增加是指当业务程序要求读取的数据超过当前流控窗口时,正常情况下,发送端会分多个frame多次发送。为了提升发送性能,接收端此时会发送一个windowUpdate窗口指示发送端可以发送更多数据从而绕过流控窗口的限制。这个优化在高延迟网络下可以提升10倍以上的性能。

参考资料:

1. gRPC 流量控制详解 - 掘金

2.   gRPC性能优化(BDP & 流控窗口临时增加) 

3. 再谈 gRPC 的 Trailers 设计 

4. grpc/PROTOCOL-HTTP2.md at master · grpc/grpc · GitHub 

5. http2 接收窗口自动调整

6. RFC 7540 - Hypertext Transfer Protocol Version 2 (HTTP/2)


http://www.kler.cn/a/9834.html

相关文章:

  • windows 11编译安装ffmpeg(包含ffplay)
  • 2024年11月10日系统架构设计师考试题目回顾
  • 模型压缩相关技术概念澄清(量化/剪枝/知识蒸馏)
  • Android 开发指南:初学者入门
  • c语言数据结构与算法--简单实现队列的入队和出队
  • Caused by: org.apache.flink.api.common.io.ParseException: Row too short:
  • 【spring】通过抽象类与ApplicationContext编写扩展性强的业务逻辑
  • 使用国密SSL证书,实现SSL/TLS传输层国密改造
  • 【你听说了吗】GPT-5据说已经学完了世界上现存所有的视频
  • 电脑自动录屏软件哪个好用 电脑自动录屏怎么设置
  • 剑指 Offer 49. 丑数
  • 【计算机组成原理 - 第二章】系统总线(完结)
  • css 导航栏效果
  • ICPC SWERC 2020 K - Unique Activities(SAM记录子串第一次结束位置 or SAM + hash)
  • ​分析新特征背后的内在逻辑,才能把握未来一段时期的科技发展新方向
  • Python 进阶指南(编程轻松进阶):十二、使用 Git 组织您的代码项目
  • springboot项目配置文件不允许出现明文密码的解决方法(jasypt使用方法)
  • ChatGPT 指令大全
  • 力扣119杨辉三角 II:代码实现 + 方法总结(数学规律法 记忆法/备忘录)
  • 低静态电流-汽车电池反向保护系统的方法
  • 第八章 Vite4+Vue3+Vtkjs 完整demo演示
  • 刘二大人《Pytorch深度学习实践》第十一讲卷积神经网络(高级篇)
  • 工厂模式白话 - 3种都有哦
  • C语言——变参函数
  • 为什么Java8不使用CMS作为默认垃圾收集器
  • 死锁的检测和案例