【GeeRPC】项目总结:使用 Golang 实现 RPC 框架
文章目录
- 项目总结:使用 Golang 实现 RPC 框架
- 谈谈 RPC 框架
- 什么是 RPC 框架
- 实现一个 RPC 框架需要什么?
- 项目总结·文章结构安排
- Part1:消息编码
- 编解码器的实现
- 通信过程
- Part2:服务端
- Accept:阻塞地等待连接请求并开启 goroutine 进行处理
- ServeConn:对连接进行处理直到连接挂断
- serveCodec:通过解码器读取 Request,进行处理,并将结果编码回发 Response
- request 结构
- readRequest:通过 GobCodec 从字节流构造 request
- findService:根据服务和方法获取服务名和方法名
- 回到 readRequest
- 回到 serveCodec
- handleRequest:处理一次具体的 RPC 调用请求
- sendResponse:向 client 发送具体的响应
- Register:将服务注册到 Server 当中,服务当中包含方法
- 回顾 Service
- 回到 Register
- ServeHTTP:处理 HTTP 请求
- HandleHTTP
- Part3:注册中心与服务发现
- GeeRegistry:GeeRPC 的注册中心
- GeeRegistryDiscovery:GeeRPC 的服务发现模块
- MultiServersDiscovery:基本的服务发现模块
- 回到 GeeRegistryDiscovery
- Part4:高性能客户端
- Call 方法:客户端基于负载均衡策略选择单个服务实例执行 RPC 调用
- XDial:一个较为通用的 RPC 客户端连接函数,支持多种协议
- Client:底层的客户端实例
- Call 实例:承载一次 RPC 调用
- 回到 Client,实现 Client 的接收功能 receive
- 同步进度,回到 XDial 和 call 方法
- Client 的 Call 方法:发起 RPC 调用并通过 Context 引入超时机制
- Broadcast 方法:客户端向所有可以用的 Server 进行 RPC 调用广播
- Part5:从 main 函数出发完整地体验一次 GeeRPC 的使用
项目总结:使用 Golang 实现 RPC 框架
二月中旬我参考 Geektutu 的 GeeRPC 教程动手实现了一个 RPC 框架。现在我来对 GeeRPC 项目进行总结,采用的方式与总结 Gee 以及 Zinx 不同,使用一个 GeeRPC 框架需要服务器(Server)、客户端(Client)和注册中心(Registry)三部分,我将首先对每个部分进行剖析,之后从真正使用一个 RPC 远程调用出发,对 GeeRPC 的工作流程进行分析。
学习 GeeRPC 的八篇文章链接如下:
- 【GeeRPC】7天用 Go 从零实现 RPC 框架 GeeRPC
- 【GeeRPC】Day1:服务端与消息编码
- 【GeeRPC】Day2:支持并发与异步的客户端
- 【GeeRPC】Day3:服务注册(Service Register)
- 【GeeRPC】Day4:超时处理(timeout)
- 【GeeRPC】Day5:支持 HTTP 协议
- 【GeeRPC】Day6:负载均衡
- 【GeeRPC】Day7:服务发现与注册中心
谈谈 RPC 框架
什么是 RPC 框架
RPC(Remote Procedure Call,远程过程调用)框架是一种用于实现分布式系统中跨网络调用远程服务的工具。它允许程序像调用本地函数一样调用远程服务器上的函数,隐藏了底层网络通信的复杂性。
RPC 框架的核心组件如下:
- 客户端(Client):发起远程调用的进程;
- 服务端(Server):提供远程调用的进程;
- 存根(Stub):客户端和服务端各有一个存根,分别负责调用请求打包(序列化)和接收响应(反序列化);
- 通信协议(Protocol):定义数据传输的格式和规则;
RPC 的工作流程:
- 客户端调用:客户端通过本地存根发起远程调用;
- 序列号:存根将调用信息(方法名、参数等)序列化后发给服务端;
- 网络传输:序列化后的数据通过网络传输到服务端;
- 反序列化:服务端存根接收并反序列化数据;
- 执行调用:服务端执行相应的方法;
- 返回结果:服务端将结果序列化后返回给客户端;
- 客户端接收:客户端存根接收并反序列化结果,返回给调用者。
RPC 的优点:
- 透明性:调用远程服务就像调用本地函数一样简单;
- 高效性:优化网络通信,提升性能。最直观的表现是 RPC 将采用更加高效的数据编码方式,提高数据在网络当中的传输效率,这一点有别于基于 HTTP 的纯文本传输进行通信的方法;
- 跨语言支持:许多 RPC 框架支持多种编程语言。
RPC 的缺点:
- 复杂性:处理网络故障、超时等问题增加了系统的复杂性;
- 耦合性:服务端和客户端接口紧密耦合,接口改变将会影响双方。
实现一个 RPC 框架需要什么?
首先,需要先确定通信双方所选择的通信协议(比如 TCP、HTTP/2、WebSocket 等)。之后,需要约定好通信双方的编码格式(比如 XML、JSON、Protobuf 等)。
然后,我们还需要解决一系列服务的可用性问题,比如处理连接超时、支持异步请求和并发等。
在分布式场景下,可能会有很多服务实例,客户端并不关心这些实例的地址和部署位置,只关心自己能否接收到 RPC 调用期待的结果。为了实现上述需求,自然引出了注册中心和负载均衡。客户端和服务端只需要感知注册中心,服务端将其所能够提供的服务注册到注册中心,客户端从注册中心拉取可以调用的服务即可。注册中心应该实现服务的动态添加、删除、使用心跳确保服务可用等功能。
项目总结·文章结构安排
与 Gee 或 Zinx 不同,GeeRPC 是一个接近千行的较大的项目,如果直接采用“从 main 函数”开始的方法对整个项目进行分析非常的困难,因为 main 当中表面上一个很简单的函数调用背后可能隐藏着很庞杂的实现细节。因此我打算从 RPC 框架的必要组件入手,从最简单的编解码方法开始,逐步分析客户端、服务端以及注册中心,最后再从 main 函数出发总览全局,透彻地理解这整个项目。
Part1:消息编码
编解码器的实现
一个典型的 RPC 调用如下:
err = client.Call("Arith.Multiply", args, &reply)
上述语句的行为是,客户端 client 调用 Call,Call 的参数是方法名、方法参数及保存返回值的引用(&reply
),Call 调用的返回值是 err。
根据 RPC 调用的特点,GeeRPC 将消息抽象为两部分,即 Header 和 Body。Header 结构的定义如下:
type Header struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
Error string
}
- ServiceMethod 保存调用的方法名;
- Seq 保存请求的序列号,可以被认为是某个请求的 ID,用于区分不同的请求;
- Error 是错误信息,在客户端发送的消息当中置为空,如果服务端发生错误,将把 Error 置在消息的 Header 当中保存。
Header 是 Message 的一部分,设定好 Header 之后,我们进一步考虑如何对 Message 进行编码。GeeRPC 将编解码方法抽象为一个 Codec 接口,并定义了一些方法:
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}
可以看到,Codec 接口内嵌了 io.Closer
的方法,用于关闭资源。此外,Codec 还实现了:
- ReadHeader:参数就是 Header 类型的指针,用于解码出一条 Message 中的 Header;
- ReadBody:参数是空接口类型,用于解码 Body;
- Write:参数是 Header 的指针和空接口类型,当然对应的就是 Header 和 Body 这两个部分。Write 方法的作用是对 Message 进行编码,即:将 Header 和 Body 编码。
GeeRPC 框架中采用 gob
的格式对 Message 进行编码,gob
是一种用于 Go 对象序列化和反序列化的编码格式。gob
是 Go 语言特有的二进制编码格式,专门为 Go 的数据结构设计,能够高效地编码和解码 Go 的数据类型。
在对 codec 进行初始化时,会新建一个实现了 Codec 接口的 GobCodec 来对数据进行编解码:
type NewCodecFunc func(closer io.ReadWriteCloser) Codec
type Type string
const (
GobType Type = "application/gob"
JsonType Type = "application/json"
)
var NewCodecFuncMap map[Type]NewCodecFunc
func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec // NewGobCodec 还没定义, 将在 gob.go 定义
}
GobCodec 的实现如下:
type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer
dec *gob.Decoder
enc *gob.Encoder
}
// 👇 确保 GobCodec 实现了 Codec 接口
var _ Codec = (*GobCodec)(nil)
// 👇 GobCodec 的工厂函数
func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}
GobCodec 的成员包括:
- conn:保存连接实例;
- buf:缓冲区,一般设置缓冲区的作用是提升性能;
- enc:gob 的编码器;
- dec:gob 的解码器。
GobCodec 实现了的 Codec 的方法如下:
func (c *GobCodec) ReadHeader(h *Header) error {
return c.dec.Decode(h)
}
func (c *GobCodec) ReadBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
defer func() {
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()
if err = c.enc.Encode(h); err != nil {
log.Println("rpc: gob error encoding header:", err)
return
}
if err = c.enc.Encode(body); err != nil {
log.Println("rpc: gob error encoding body:", err)
return
}
return
}
func (c *GobCodec) Close() error {
return c.conn.Close()
}
可以看到,GobCodec 方法的实现还是非常简单的,但是需要明确的一点就是,ReadHeader 和 ReadBody 是从当前连接的字节流(stream)当中读取字节并进行解码,得到的结果将会保存到 Header 指针或空接口类型当中。
通信过程
对于 GeeRPC 而言,通信双方唯一需要协商的内容就是消息的编解码方式,这部分信息将由 Option 来承载:
const MagicNumber = 0x3bef5c
// Option 用于客户端和服务端之间的协议协商
type Option struct {
MagicNumber int // MagicNumber marks this's a geerpc request
CodecType codec.Type // client may choose different Codec to encode body
ConnectTimeout time.Duration
HandleTimeout time.Duration
}
// DefaultOption 是默认的协议选项, 使用 Gob 编码
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: time.Second * 10,
}
当然,Option 当中还承载了“连接超时”和“处理超时”的信息,我们将在后续客户端与服务端的超时处理部分进行总结。还是先回到消息编码的部分,为了实现上的简单,GeeRPC 客户端固定采用 JSON 格式对 Option 进行编码,而后续的 Head 和 Body 则采用 CodecType 指定的格式进行编码。也就是说,报文的发送格式为:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
在某个连接所发送的字节流当中,Option 被固定在最开始的位置,Header 和 Body 可以有多个,其形式可能是:
| Option | Header1 | Body1 | Header2 | Body2 | ...
至此,我们便完成了 GeeRPC 消息编码部分的总结,后续在 Server 和 Client 中用到 Codec 的时候,我将对消息编码部分的内容进行再次回顾。
Part2:服务端
我们已经处理完了通信的过程,包括在客户端和服务端之间协商并确定通信的方式,以及编解码器的具体实现,现在我们来回顾服务端的实现。
总得来说,对于一个提供服务的 Server,它最基本的功能就是,根据收到的 Request,进行相应的业务处理,再将业务处理的结构以及一些额外的信息(比如序号、服务可用状态等)构造成 Response 原路发送回请求服务的客户端。
我们再细化一下,在一个 RPC 框架下,Server 接收到的消息是服务端和客户端已经协商好的编码形式的字节流,Server 需要先将 Header 和 Body 解析出来。我们刚才已经回顾过,Header 当中包含 ServiceMethod,因此我们可以从 Header 得知 Client 请求调用的是哪个方法。之后,Server 端从注册中心找到对应的服务并调用,得到业务处理的结果,构造成 Response 通过 Codec 再写回到字节流当中,完成一次 RPC 调用。
Server 在实现上只具有一个成员,那就是保存注册服务的 map:
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // serviceMap 存储注册的服务, 键为服务名, 值为 *service
}
Server 的方法非常的多,包括:
Register(rcvr interface{}) error
findService(serviceMethod string) (svc *service, mtype *methodType, err error)
ServeConn(conn io.ReadWriteCloser)
serveCodec(cc codec.Codec, opt *Option)
readRequestHeader(cc codec.Codec) (*codec.Header, error)
readRequest(cc codec.Codec) (*request, error)
sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex)
handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration)
Accept(lis net.Listener)
ServeHTTP(w http.ResponseWriter, req *http.Request)
HandleHTTP()
其中导出的方法包括:
Register(rcvr interface{}) error
ServeConn(conn io.ReadWriteCloser)
Accept(lis net.Listener)
ServeHTTP(w http.ResponseWriter, req *http.Request)
HandleHTTP()
私有的方法包括:
findService(serviceMethod string) (svc *service, mtype *methodType, err error)
serveCodec(cc codec.Codec, opt *Option)
readRequestHeader(cc codec.Codec) (*codec.Header, error)
readRequest(cc codec.Codec) (*request, error)
sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex)
handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration)
启动一个服务器之前需要先建立一个服务器实例,Server 的工厂函数非常简单:
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
}
现在我们对 Server 的每一个方法进行剖析:
Accept:阻塞地等待连接请求并开启 goroutine 进行处理
之后通过 Accept 方法接收一个 Listener 对象,表示 Server 可以在这个地址对请求进行监听,Accept 方法的实现是:
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Println("rpc server: accept error:", err)
return
}
go server.ServeConn(conn)
}
}
在 for loop 当中,Accept 阻塞地接收连接请求,并通过 ServeConn 方法对得到的 conn 进行进一步的处理。
ServeConn:对连接进行处理直到连接挂断
在 Accept 中启动一个 ServeConn 方法的 goroutine,参数就是 conn,用于处理从 conn 得到的字节流:
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// ServeConn: 处理单个连接
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() { _ = conn.Close() }()
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
}
if opt.MagicNumber != MagicNumber {
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
server.serveCodec(f(conn), &opt)
}
需要再次强调的是,conn 发送过来的是字节流,而不是结构化的数据,因此在理解 ServeConn 的行为时,我们应该转换思路。
首先,ServeConn defer 了一个关闭连接的 func,它的作用是在 err 发生的时候将连接关闭。需要注意的是 ServeConn 在连接正常的情况下是不会 return 的,因为最后一条语句所使用的 serveCodec 方法使用了 for loop 阻塞。
之后,ServeConn 中建立了一个 Option 类型的 opt,用于保存从字节流当中读取到的数据。首先使用 json 的 Decoder 读取 opt,原因在于我们的约定当中使用 JSON 保存 Option,而使用 gob 保存 Header 和 Body。
读取完 Option 之后,先比对读到的 MagicNumber 是否相同,这也可以视为一种加密的手段。如果 MagicNumber 相同,便从 opt 当中读取 CodecType(默认就是 gob),通过 codec 当中的 NewCodecFuncMap 建立解码相应编码字节流的解码器 f。需要注意 f 的类型是:func(closer io.ReadWriteCloser) Codec
。
最后调用 serveCodec 方法,从字节流中继续解码 Header 和 Body。
serveCodec:通过解码器读取 Request,进行处理,并将结果编码回发 Response
serveCodec 可以说是 Server 的关键中间组件,它的作用是将 Request 解码与 Response 编码联通在一起。
首先,serveCodec 的第一个参数是 Codec 接口类型,而它的调用者 ServeConn 传入的参数是 f(conn)
,f 保存的是 GobCodec 的工厂函数,因此 f(conn)
将会新建一个 GobCodec 类型的编解码器实例。
serveCodec 当中首先初始化了一个 sending 和 wg,二者的作用都是并发控制。sending 是一个 Mutex 的指针,而 wg 是 WaitGroup 的指针。
之后,开启一个 for loop,通过 readRequest 读取 Header 和 Body,并构造 request 实例。在进一步剖析 serveCodec 之前,我们先来仔细研究一下 readRequest 方法和 request 结果。
request 结构
request 结构的定义如下:
// request stores all information of a call
// request 存储一个请求的所有信息.
type request struct {
h *codec.Header // header of request
argv, replyv reflect.Value // argv and replyv of request
mtype *methodType // 方法
svc *service // 服务信息
}
显然,request 是用来保存一次 client RPC 调用的所有信息的结构体,包括 Header、RPC 的参数及返回值类型、方法和服务信息。request 结构不具有方法。
readRequest:通过 GobCodec 从字节流构造 request
readRequest 方法的实现如下:
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := &request{h: h}
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
if err != nil {
return req, err
}
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()
// make sure that argvi is a pointer, ReadBody need a pointer as parameter
argvi := req.argv.Interface()
if req.argv.Type().Kind() != reflect.Ptr {
argvi = req.argv.Addr().Interface()
}
if err = cc.ReadBody(argvi); err != nil {
log.Println("rpc server: read argv err:", err)
}
return req, nil
}
其中又包含着 readRequestHeader 方法,readRequestHeader 方法的实现如下:
func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println("rpc server: read header error:", err)
}
return nil, err
}
return &h, nil
}
readRequestHeader 方法的作用就是通过 Codec 解码出字节流当中 Header。
在读取到 Header 之后,在 readRequest 当中会根据 Header 对 request 进行构造,从 Header 当中可以读取到 Client 此次 RPC 调用所需要的方法名,我们需要通过方法名到 findService 方法当中去寻找具体的方法。
findService 方法较为复杂,它设计到服务 service 的设计。
findService:根据服务和方法获取服务名和方法名
findService 的设计如下:
// findService: 根据服务(service)和方法(method)获取服务名和方法名
func (server *Server) findService(serviceMethod string) (svc *service, mtype *methodType, err error) {
dot := strings.LastIndex(serviceMethod, ".")
if dot < 0 {
err = errors.New("rpc server: service/method request ill-formed: " + serviceMethod)
return
}
serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]
// 👆 根据 serviceMethod 解析并获取服务名和方法名
svci, ok := server.serviceMap.Load(serviceName) // 从 serviceMap 中查找服务
if !ok {
err = errors.New("rpc server: can't find service " + serviceName)
return
}
svc = svci.(*service)
mtype = svc.method[methodName] // 从 svc.method 中查找方法
if mtype == nil {
err = errors.New("rpc server: can't find method " + methodName)
}
return
}
它的参数是 Header 当中保存的 serviceMethod,返回的参数是 service 类型的指针、methodType 类型的指针以及错误信息。
service 结构保存的是一个服务实例:
type service struct {
name string // 服务的名称
typ reflect.Type // 服务的类型
rcvr reflect.Value // 服务的接收者(即服务实例)
method map[string]*methodType // 服务的方法集合, 键为方法名, 值为 methodType
}
而 methodType 保存的是服务类型,其中包括方法参数的类型、返回值的类型以及调用次数:
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint64
}
回到 findService 函数本身,我们来研究一下 findService 函数体当中的行为。首先,findService 通过 strings.LastIndex
对 serviceMethod 进行分割,即:将 service 和 Method 通过 .
进行分割,得到服务名和方法名。
根据服务名,到 Server 的 serviceMap 成员当中查找服务:
svci, ok := server.serviceMap.Load(serviceName) // 从 serviceMap 中查找服务
查找到服务之后,再从服务当中查找方法:
svc = svci.(*service)
mtype = svc.method[methodName] // 从 svc.method 中查找方法
研究到这里,其实我对 serviceMap 以及服务和方法的查找仍然有些模糊,在后面的服务注册环节应该会研究到此中细节。在此我们先认定,通过 findService 方法我们可以获得服务和方法的句柄。
回到 readRequest
饶了一个圈,我们回到了 readRequest。我们先进行进度同步:readRequest 的作用是读取字节流并从中构造 request 结构,request 表示的就是 client 所进行的一次 RPC 调用传入的服务 + 方法名、参数以及保存返回值的引用。现在我们通过 findService 方法得到了 client 想要使用的方法的服务与方法句柄:
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
将 svc 和 mtype 一并保存到 request 类型的 req 当中。接下来我们通过 golang 的反射机制构造本次 RPC 调用对应方法所需要的参数值类型和返回值类型,具体要用到的是 methodType 的 newArgv 方法和 newReplyv 方法:
func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
// arg may be a pointer type or a value type
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem())
} else {
argv = reflect.New(m.ArgType).Elem()
}
return argv
}
func (m *methodType) newReplyv() reflect.Value {
// reply must be a pointer type
replyv := reflect.New(m.ReplyType.Elem())
switch m.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))
}
return replyv
}
将结果保存到 req 实例:
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()
我们需要确保参数类型是指针,因为我们需要从字节流的 Body 当中读取参数,使用 Codec 解码的结果应该保存到指针当中:
// make sure that argvi is a pointer, ReadBody need a pointer as parameter
argvi := req.argv.Interface()
if req.argv.Type().Kind() != reflect.Ptr {
argvi = req.argv.Addr().Interface()
}
最后通过 Codec 的 ReadBody 方法将读取到的 argvi(传入的参数)保存到 req 当中,这一步是自动完成的,因为 argvi 引用的是 req.argv
的地址。
至此 readRequest 完成,将 req 返回,req 是一个 request 类型的指针。
回到 serveCodec
我们再次同步进度。在 serveCodec 当中,我们通过 req, err := server.readRequest(cc)
从字节流构造了 request 对象。之后进行错误处理,如果错误不为空,那么通过 sendResponse 方法发送一个非法请求的提示给客户端,sendResponse 方法将在后面的 handleRequest 进行分析。
如果 readRequest 在对 request 进行构造时没有出错,就将 WaitGroup 通过 Add 加一(WaitGroup 的作用就是用于 goroutine 的管理),然后开启一个 goroutine,调用 handleRequest 方法来处理 request 对象,即:根据 request 进行具体的业务处理。
由于 serveCodec 使用了 for loop,因此它将阻塞地等待来自 conn 的字节流当中的消息。当有错误出现导致 for loop 中断时,WaitGroup 的作用体现了出来,它将通过 wg.Wait()
等待其下辖的所有 goroutine 执行完毕,才会结束。
现在我们来对 handleRequest 方法进行研究,看一下 GeeRPC 如何处理来自 Client 的 RPC 调用请求。
handleRequest:处理一次具体的 RPC 调用请求
handleRequest 方法的实现如下:
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()
called := make(chan struct{}) // 用于通知请求处理完成
sent := make(chan struct{}) // 用于通知响应已发送
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用 req.svc.call 执行远程方法
called <- struct{}{} // 向 called 通道发送信号
if err != nil { // 如果方法返回错误, 设置错误信息并发送错误响应
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending) // 如果方法执行成功, 发送正常响应
sent <- struct{}{} // 向 sent 通道发送信号, 表示响应已发送
}()
// 👇 监听请求处理的超时和完成
if timeout == 0 { // 如果 timeout 为 0, 表示不启用超时机制, 直接等待请求完成
<-called
<-sent
return
}
select {
case <-time.After(timeout):
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called:
<-sent
}
}
首先,handleRequest 的参数包括 Codec 编解码器、request 指针 req、sending 锁、wg WaitGroup 以及 time。handleRequest 首先 defer 一个 wg.Done()
它等价于 wg.Add(-1)
,即通过 WaitGroup 当前的线程执行完毕。
之后 handleRequest 建立了两个 struct{}
类型的 channel,用于 goroutine 间通信:
called := make(chan struct{}) // 用于通知请求处理完成
sent := make(chan struct{}) // 用于通知响应已发送
随后开启一个 goroutine:
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用 req.svc.call 执行远程方法
called <- struct{}{} // 向 called 通道发送信号
if err != nil { // 如果方法返回错误, 设置错误信息并发送错误响应
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending) // 如果方法执行成功, 发送正常响应
sent <- struct{}{} // 向 sent 通道发送信号, 表示响应已发送
}()
在这个 goroutine 当中直接根据 service 对 method 进行调用,并传入参数及返回值引用。未导出的 call 方法相当关键,它是 service 类型的方法,可以说 err := req.svc.call(req.mtype, req.argv. req.replyv)
就是一次具体的 RPC 调用。call 方法的实现如下:
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
/*
s *service: 表示当前服务的实例
m *methodType: 表示要调用的方法的信息
argv reflect.Value: 表示方法的参数值
replyv reflect.Value: 表示方法的返回值(通过指针的方式保存返回值)
返回值: error
*/
atomic.AddUint64(&m.numCalls, 1) // 原子性地增加调用次数, 保证并发安全
f := m.method.Func // 获取方法的反射函数
returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv}) // 通过反射调用方法
if errInter := returnValues[0].Interface(); errInter != nil { // 处理返回值
return errInter.(error)
}
return nil
}
执行 RPC 调用之后,通过 called 发送信号通知外部的父 goroutine 当前请求已完成。无论是否出错,都用过 sendResponse 方法将 response 回发给 client。随后通过 sent 发送信号通知外部的父 goroutine 响应已发送。
最后要进行的就是超时处理,如果 timeout == 0
,那么不进行超时处理,阻塞地等待 called 和 sent 信号的到来便退出。否则通过 select + time.After 进行超时处理。
sendResponse:向 client 发送具体的响应
sendResponse 的实现比较简单:
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
sending.Lock()
defer sending.Unlock()
if err := cc.Write(h, body); err != nil {
log.Println("rpc server: write response error:", err)
}
}
它的作用就是将传入的 Header 以及 Body 通过 Codec 编码回写到发送给 Client 的字节流。
至此,我们认真解析了服务端从字节流当中解析数据,并执行一次 RPC 调用,再将 Response 发回给 Client 的过程。所涉及的方法包括:
- Accept:阻塞地等待连接请求并开启 goroutine 进行处理
- ServeConn:对连接进行处理直到连接挂断
- serveCodec:通过解码器读取 Request,进行处理,并将结果编码回发 Response
- readRequest:通过 GobCodec 从字节流构造 request
- readRequestHeader:从字节流中解析 Header
- findService:根据服务和方法获取服务名和方法名
- handleRequest:处理一次具体的 RPC 调用请求
- sendResponse:向 client 发送具体的响应
Server 还剩下:
- Register;
- ServeHTTP;
- HandleHTTP;
三个方法没有深入研究。
Register:将服务注册到 Server 当中,服务当中包含方法
我们可以通过这样的方式来将服务注册到 Server 当中:
type Foo int
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
func (f Foo) Sleep(args Args, reply *int) error {
time.Sleep(time.Second * time.Duration(args.Num1))
*reply = args.Num1 + args.Num2
return nil
}
func Xxx() {
var foo Foo
// ... start a server
_ = server.Register(&foo)
}
Register 的实现如下:
// Register publishes in the server the set of methods
func (server *Server) Register(rcvr interface{}) error {
// Register 注册一个服务到服务端
s := newService(rcvr) // 创建服务实例
// 👇 使用 sync.Map 的 LoadOrStore 方法存储服务, 如果服务已经存在, 则返回错误
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {
return errors.New("rpc: service already defined: " + s.name)
}
return nil
}
不难看出,Register 方法当中新建了一个服务实例,通过 newService 工厂函数来实现。Register 的参数是一个空接口类型,根据刚才的例子我们已经知道,传入的其实是一个具有若干种方法的结构,这个结构被称为 Service,它所具有的方法称作 Method,二者合并起来构成serveMethod
或service.Method
。
回顾 Service
Service 结构的定义我们之前已经提到,现在不妨回顾一下:
type service struct {
name string // 服务的名称
typ reflect.Type // 服务的类型
rcvr reflect.Value // 服务的接收者(即服务实例)
method map[string]*methodType // 服务的方法集合, 键为方法名, 值为 methodType
}
// newService 创建了一个新的服务实例
func newService(rcvr interface{}) *service {
s := new(service)
s.rcvr = reflect.ValueOf(rcvr) // 获取服务实例的值
s.name = reflect.Indirect(s.rcvr).Type().Name() // 获取服务实例的类型名称
s.typ = reflect.TypeOf(rcvr)
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods() // 调用 registerMethods 方法注册对应服务的方法, registerMethods 在下面实现
return s // 返回一个 service 实例
}
通过 registerMethods 方法,可以将 service 具备的 Method 进行注册,并保存到这个 service 实例的 method 当中,method 成员是一个 map,它的 key 是 string,即方法名,value 是 methodType 的指针,可以理解为一种函数指针。registerMethods 方法及其相关结构与方法如下:
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint64
}
// registerMethods 注册了服务的方法
func (s *service) registerMethods() {
s.method = make(map[string]*methodType)
for i := 0; i < s.typ.NumMethod(); i++ { // 遍历服务的所有方法
method := s.typ.Method(i)
mType := method.Type
// 👇 检查方法的签名是否符合 RPC 方法的规范
// 1. 方法必须有三个入参和一个输出参数
if mType.NumIn() != 3 || mType.NumOut() != 1 {
continue
}
// 2. 输出参数的类型必须是 error 类型
if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
continue
}
argType, replyType := mType.In(1), mType.In(2)
// 3. 参数和返回值的类型必须是导出的或内置类型
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {
continue
}
// 经过检查, 符合规定的方法注册到 s.method 当中
s.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
// 日志: 输出注册的方法名
log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
}
}
// isExportedOrBuiltinType 的作用是检查类型是否是导出的或内置的类型
func isExportedOrBuiltinType(t reflect.Type) bool {
return ast.IsExported(t.Name()) || t.PkgPath() == ""
}
回到 Register
创建服务实例并保存了该服务当中的方法之后,即可将这个服务加入到 Server 的 serviceMap 成员当中了,后续通过服务名就可以索引到具体的服务:
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {
return errors.New("rpc: service already defined: " + s.name)
}
ServeHTTP:处理 HTTP 请求
ServeHTTP 的实现如下:
// ServeHTTP implements a http.Handler that answers RPC requests.
// ServeHTTP 方法实现了 http.Handler 接口, 用于处理 HTTP 请求
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
// 首先检查请求方法是否为 CONNECT, 如果不是, 返回 405 Method Not Allowed 错误, 并提示客户端必须使用 CONNECT 方法
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = io.WriteString(w, "405 must CONNECT\n")
return
}
// 如果请求方法是 CONNECT, 则通过 http.Hijacker 接口劫持连接, 获取底层的 net.Conn 对象, 劫持连接后, 服务器可以直接
// 控制底层的 TCP 连接, 而不需要 HTTP 协议进行通信, conn 应该就是劫持到的 TCP 连接
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
// 劫持连接后, 服务端向客户端发送一个简单的 HTTP 响应, 表示连接已建立
_, _ = io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
// 最后调用 server.ServeConn(conn) 方法, 开始处理 RPC 请求
server.ServeConn(conn)
}
我的理解是 ServeHTTP 方法的作用就是处理有 HTTP 请求到来的情况,不难看出 ServeHTTP 实际上是实现了 http.Handler 的接口,这样 Server 就可以直接通过 http.ListenAndServe 对 HTTP 请求进行监听了。
HandleHTTP
// HandleHTTP registers an HTTP handler for RPC messages on rpcPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
// HandleHTTP 方法用于处理 HTTP 处理器. 它将 defaultDebugPath 路径与 server 关联起来, 使得客户端访问该路径时, 会调用 server
// 的 ServeHTTP 方法
defaultDebugPath = "/debug/geerpc" // 默认的调试信息路径, 用于注册 HTTP 处理器
func (server *Server) HandleHTTP() {
http.Handle(defaultRPCPath, server)
http.Handle(defaultDebugPath, debugHTTP{server})
log.Println("rpc server debug path:", defaultDebugPath)
}
HandleHTTP 方法将 defaultDebugPath(一个常量)与 server 关联了起来,使得客户端访问 defaultDebugPath 时,调用 Server。
至此,我们已经完成了 GeeRPC 服务端的剖析,现在我们来回顾一下 demo 中给出的支持并发与异步的高性能客户端。
Part3:注册中心与服务发现
在 Geektutu 给出的教程当中,注册中心(Registry)和服务发现(Discovery)两个模块被安排在了 Day7,但由于我们并非按照教程的顺序对 GeeRPC 进行学习,而是从学习完 GeeRPC 项目之后复盘整个项目的角度出发,因此在这篇文章中我首先对注册中心与服务发现进行回顾,它连接了服务端与客户端,随后再对客户端进行剖析。
遵循与分析服务端时一样的套路,在分析注册中心与服务发现模块之前,我们需要先搞清楚为什么需要这两个模块,这两个模块在 GeeRPC 框架当中扮演着什么角色?
如上图所示基于注册中心,客户端和服务端不需要对彼此感知,而只需要对注册中心感知,客户端即可发起一次服务端提供的远程调用。
在分布式场景下,提供相同服务的多个不同服务实例可能分布在不同的物理位置上,此时就需要一个服务发现模块来维护不同服务实例的状态,并负责远程调用的资源调度。
GeeRegistry:GeeRPC 的注册中心
GeeRPC 实现的注册中心 GeeRegistry 是一个相对简单的注册中心,大体上它的功能就是保存当前可用的服务器列表,并通过心跳(Heartbeat)机制确认服务是否可用。为了实现上的简单,GeeRegistry 通过 HTTP 协议提供服务,且所有信息都通过 HTTP Header 承载。使用 HTTP 的 GET 方法可以获取所有可用的服务列表,使用 HTTP 的 POST 方法可以添加服务实例或发送心跳。
在深入源码之前我们进一步分析一下,在实现 Gee 时我们已经知道,为了使 GeeRegistry 可以通过 HTTP 协议提供服务,在 golang 的语境下,GeeRegistry 应该实现 HTTP 的 Handler 接口,这样才能够通过 http.Serve 处理 HTTP Request,而实现 Handler 接口,其实就是要为 GeeRegistry 构造好一个 ServeHTTP 方法,它的参数是 ResponseWriter 和 Request 指针。所以在我们的构想当中,GeeRegistry 应该是具备一个 ServeHTTP 方法的,当然也确实具备。在 ServeHTTP 方法当中,应该分上面提到的两种情况分别处理 GET 和 POST 两种方法。
现在让我们来深入源码,首先研究一下 GeeRegistry 结构及其工厂函数:
type GeeRegistry struct {
timeout time.Duration
mu sync.Mutex
servers map[string]*ServerItem
}
type ServerItem struct {
Addr string
start time.Time
}
const (
defaultPath = "/_geerpc_/registry"
defaultTimeout = time.Minute * 5
)
func New(timeout time.Duration) *GeeRegistry {
return &GeeRegistry{
servers: make(map[string]*ServerItem),
timeout: timeout,
}
}
var DefaultGeeRegistry = New(defaultTimeout)
GeeRegistry 的成员包含一个 timeout,一个互斥锁和一个保存服务器信息的 map。timeout 成员的作用就是记录心跳的周期。互斥锁 mu 确保同一时间只有一个协程在对注册中心进行修改。servers 的 key 是 string,value 是 *ServerItem
,ServerItem 包含地址 Addr 和服务开启时间 start(用于计算当前服务是否过期)。默认的过期时间是 5 分钟。
我们刚才已经说过,GeeRegistry 是通过 HTTP 协议来工作的,因此它实际上没有对外暴露的接口,而应该实现 ServeHTTP 方法,并通过 http.Handle 进行注册。这样 GeeRegistry 就可以基于 HTTP 协议提供服务。具体的实现为:
func HandleHTTP() {
DefaultGeeRegistry.HandleHTTP(defaultPath)
}
func (r *GeeRegistry) HandleHTTP(registryPath string) {
http.Handle(registryPath, r)
log.Println("rpc registry path:", registryPath)
}
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case "GET": // 返回所有可用的服务列表, 通过自定义字段 X-Geerpc-Servers 承载
w.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))
case "POST": // 添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载
addr := req.Header.Get("X-Geerpc-Server")
if addr == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
r.putServer(addr)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
当我们在 main 函数当中启动服务之前,需要先启动注册中心,直接调用 HandleHTTP 函数即可,它封装了对 DefaultGeeRegistry 的 HandleHTTP 方法的调用。http.Handle 将 handler 与给定的 registryPath 这个 URL 相绑定,通过指定的 HTTP Method 对 URL 进行访问即可调用对应的方法,而这个 handler 正是 GeeRegistry 本身。当客户端向 GeeRegistry 发起 Request 时,GeeRegistry 通过 ServeHTTP 当中提供的方法回发 Response,完成服务。
现在我们来研究一下 GeeRegistry 的 ServeHTTP 方法如何实现。正如我们刚才所提到的,GeeRegistry 通过 HTTP 协议提供服务,GET 请求将会得到当前可用的服务列表,而 POST 请求则更新了 GeeRegistry 的服务列表,此处的更新有两种情况,一种是新的服务器注册,另一种是现有的服务器发过来的一次心跳。
我们已经提到,GeeRegistry 通过 HTTP Header 承载所有有用的信息。
对于 GET 方法,GeeRegistry 设置一个 X-Geerpc-Servers
头部字段,并将该字段的值设置为当前存活的 Servers 列表。当 HTTPClient 通过 GET 方法请求 GeeRegistry 服务时,就可以通过得到的 Response 当中的 Header 得到当前 GeeRPC 服务端可用的 Servers 列表。
我们刚才已经提到,HTTPClient 可以用过发送一个 POST 方法的 Request 将服务器信息发送给 Registry。当 GeeRegistry 接收到 Method 为 POST 的 Request 时,不需要回发 Response,而是解析这个 Request 当中的 X-Geerpc-Server
字段,从中得到对应的 Server 信息,并通过 putServer 方法将 Server 加入到 GeeRegistry 当中。对于已经存在且存活的 Server,putServer 则会更新 Server 的 start 字段,即更新 Server 的开始时间。putServer 方法的实现如下:
func (r *GeeRegistry) putServer(addr string) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.servers[addr]
if s == nil { // 如果当前服务没有注册, 那么将它加入到注册中心的服务列表当中
r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
} else { // 如果当前服务已经注册, 那么更新它的注册时间
s.start = time.Now()
}
}
可以看到,Server 的地址 addr 实际上就是一个字符串。通过 Heartbeat 和 sendHeartbeat 两个方法实现 GeeRegistry 的心跳机制,Heartbeat 方法是 GeeRegistry 对外暴露的接口,一个新创建的 Server 实例可以通过 Heartbeat 方法将自己注册到 GeeRegistry 当中。Heartbeat 和 sendHeartbeat 方法的实现如下:
func Heartbeat(registry, addr string, duration time.Duration) {
if duration == 0 {
// make sure there is enough time to send heartbeat before the service is removed from registry
duration = defaultTimeout - time.Duration(1)*time.Minute
}
var err error
err = sendHeartbeat(registry, addr)
go func() {
t := time.NewTimer(duration)
for err == nil {
<-t.C
err = sendHeartbeat(registry, addr)
}
}()
}
func sendHeartbeat(registry, addr string) error {
log.Println(addr, "send heart beat to registry", registry)
httpClient := &http.Client{}
req, _ := http.NewRequest("POST", registry, nil)
req.Header.Set("X-Geerpc-Server", addr)
if _, err := httpClient.Do(req); err != nil {
log.Println("rpc server: heart beat err:", err)
return err
}
return nil
}
sendHeartbeat 方法的实现很好理解,在这个方法中将会创建一个匿名的 HTTPClient,并向 GeeRegistry 提供服务的 URL 发送一个 Request,Request 使用 POST 方法,通知 GeeRegistry 需要改变 Server list 的状态。
Heartbeat 当中有一个可以深挖的细节,那就是以下片段:
// ... ... ...
err = sendHeartbeat(registry, addr)
go func() {
t := time.NewTimer(duration)
for err == nil {
<-t.C
err = sendHeartbeat(registry, addr)
}
}()
// ... ... ...
我们首先要明确 duration 字段的作用,它的值比 Server 的默认过期时间 defaultTimeout
少一分钟。这个代码片段的作用就是每隔 (defaultTimeout - 1) * time.Minute
的时间就向 GeeRegistry 发送一次 Heartbeat,避免服务器超时。在 goroutine 当中,for loop 的终止条件是 sendHeartbeat 出现错误,当服务器出错时,停止发送心跳。
我们已经研究了如何通过 POST 方法的 HTTP Request 向 GeeRegistry 注册一个 Server,并通过心跳机制确保 Server 存活。现在我们回顾一下在何时使用 GET 方法通过 HTTP Request 从 GeeRegistry 获取 Server list。实际上,GET 方法应该在服务发现模块当中使用,服务发现模块的作用就是保存若干个当前可用的服务实例,并在客户端请求 RPC 调用时通过内置的调度机制选择服务实例实现服务调用。
根据以上的分析,我们接下来展开聊一聊 GeeRPC 的服务发现模块。
GeeRegistryDiscovery:GeeRPC 的服务发现模块
实际上分析到这一步,我们不难看出在 GeeRPC 的设计中,注册中心 Registry 和服务发现模块 Discovery 是互补的。
Registry 的功能可以概括为:
- 基于心跳机制维护一个当前存活的服务器列表;
- 接收新的服务器注册;
- 处理来自发现模块的拉取服务器列表的请求(即发现模块向 GeeRegistry 发起 GET 方法的 HTTP Request,得到当前存活的服务器列表);
而 Discovery 的功能可以概括为:
- 维护一个服务器列表(可以是自动的也可以是手动的,自动维护就是通过向 GeeRegistry 发起请求来完成);
- 当来自 client 的一次 RPC 请求到来时,根据某个调度算法,确定最终由哪个服务器来执行本次请求。
GeeRPC 的服务发现模块 GeeRegistryDiscovery 的定义如下:
type GeeRegistryDiscovery struct {
*MultiServersDiscovery
registry string
timeout time.Duration
lastUpdate time.Time
}
const defaultUpdateTimeout = time.Second * 10
func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {
if timeout == 0 {
timeout = defaultUpdateTimeout
}
d := &GeeRegistryDiscovery{
MultiServersDiscovery: NewMultiServerDiscovery(make([]string, 0)),
registry: registerAddr,
timeout: timeout,
}
return d
}
它内嵌了一个 MultiServersDiscovery,这是一个仅支持手动对服务列表进行维护的发现中心,嵌入它可以复用很多它的方法。
MultiServersDiscovery:基本的服务发现模块
MultiServersDiscovery 的定义和方法如下:
type SelectMode int
const (
RandomSelect SelectMode = iota // select randomly
RoundRobinSelect // select using Robbin Algorithm
)
type Discovery interface { // Discovery 是一个接口类型, 包含了服务发现所需要的最基本的接口
Refresh() error // Refresh 从注册中心更新服务列表
Update(servers []string) error // Update 手动更新服务列表
Get(mode SelectMode) (string, error) // Get 根据负载均衡策略, 选择一个服务实例
GetAll() ([]string, error) // GetAll 返回所有服务实例
}
// MultiServersDiscovery is a discovery for multi servers without a registry center
// user provides the server addresses explicitly instead
// MultiServersDiscovery 是一个具体的服务发现实现, 用于管理多个服务器地址
type MultiServersDiscovery struct {
r *rand.Rand // generate random number
mu sync.RWMutex // protect following
servers []string
index int // record the selected position for robbin algorithm
}
// NewMultiServerDiscovery creates a MultiServerDiscovery instance
// 构造函数, 用于创建 MultiServerDiscovery 实例
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {
d := &MultiServersDiscovery{
servers: servers,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
d.index = d.r.Intn(math.MaxInt32 - 1)
return d
}
// 这行代码确保 MultiServersDiscovery 实现了 Discovery 接口
var _ Discovery = (*MultiServersDiscovery)(nil)
// Refresh doesn't make sense for MultiServersDiscovery, so ignore it
func (d *MultiServersDiscovery) Refresh() error {
return nil
}
// Update the servers of discovery dynamically
// Update 方法用于手动更新服务器列表
func (d *MultiServersDiscovery) Update(servers []string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.servers = servers
return nil
}
// Get a server according to mode
// Get 方法根据指定的选择模式返回一个服务器地址
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) {
d.mu.Lock()
defer d.mu.Unlock()
n := len(d.servers)
if n == 0 {
return "", errors.New("rpc discovery: no available servers")
}
switch mode {
case RandomSelect:
return d.servers[d.r.Intn(n)], nil
case RoundRobinSelect:
s := d.servers[d.index%n]
d.index = (d.index + 1) % n
return s, nil
default:
return "", errors.New("rpc discovery: not supported select mode")
}
}
// GetAll returns all servers in discovery
// GetAll 方法返回所有服务器地址的副本
func (d *MultiServersDiscovery) GetAll() ([]string, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// returns a copy of d.servers
servers := make([]string, len(d.servers), len(d.servers))
copy(servers, d.servers)
return servers, nil
}
MultiServersDiscovery 共有四个成员字段以及四个方法。四个字段包括:
- r:用于生成随机数的随机种子;
- mu:保护 servers slice 的读写操作;
- servers:保存 servers 列表;
- index:保存用于 robin 算法的 index;
四个方法包括:
- Refresh:从注册中心更新服务列表,但由于基础的服务发现模块没有使用注册中心,因此 Refresh 方法我们留到 GeeRegistryDiscovery 再实现;
- Update:手动更新服务列表;
- Get:根据负载均衡策略选择一个服务实例;
- Get All:返回所有服务实例。
每一个方法的实现都比较好理解,此处不再赘述。
回到 GeeRegistryDiscovery
在 MultiServersDiscovery 的基础上,GeeRegistryDiscovery 还添加了 registry、timeout 和 lastUpdate 三个字段。registry 记录了 GeeRegistry 提供服务的 URL 地址,timeout 和 lastUpdate 用于确保当前 Discovery 维护的服务器列表没有超时。
基于 MultiServersDiscovery,GeeRegistryDiscovery 重新实现了 Discovery 接口的四个方法:
// Update 用于手动维护 Discovery 当中的服务器地址
func (d *GeeRegistryDiscovery) Update(servers []string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.servers = servers
d.lastUpdate = time.Now()
return nil
}
// Refresh 自动从注册中心 GeeRegistry 请求当前处于 alive 状态的服务器地址
func (d *GeeRegistryDiscovery) Refresh() error {
d.mu.Lock()
defer d.mu.Unlock()
if d.lastUpdate.Add(d.timeout).After(time.Now()) {
return nil
}
log.Println("rpc registry: refresh servers from registry", d.registry)
resp, err := http.Get(d.registry)
if err != nil {
log.Println("rpc registry refresh err:", err)
return err
}
servers := strings.Split(resp.Header.Get("X-Geerpc-Servers"), ",")
d.servers = make([]string, 0, len(servers))
for _, server := range servers {
if strings.TrimSpace(server) != "" {
d.servers = append(d.servers, strings.TrimSpace(server))
}
}
d.lastUpdate = time.Now()
return nil
}
// Get 通过 SelectMode 调度服务实例, 可以看到此处服用了 MultiServersDiscovery 当中的方法
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {
if err := d.Refresh(); err != nil {
return "", err
}
return d.MultiServersDiscovery.Get(mode)
}
// GetAll 返回所有服务实例, 仍然复用 MultiServersDiscovery 当中的方法
func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {
if err := d.Refresh(); err != nil {
return nil, err
}
return d.MultiServersDiscovery.GetAll()
}
在使用 Get 或 GetAll 方法获取服务实例之前,GeeRegistryDiscovery 会调用 Refresh 方法更新当前所维护的服务列表。在 Refresh 当中,要做的就是从注册中心 GeeRegistry 自动获取当前可用的服务列表,通过发送 Method 为 GET 的 HTTP Request 来完成。
至此,我们完成了 GeeRPC 当中注册中心和服务发现模块的回顾,并搞清楚了这两个模块具备哪些功能。
Part4:高性能客户端
我们现在已经知道,在 GeeRPC 框架下,基于注册中心和服务发现模块,可以同时有多个服务实例为发起 RPC 调用的客户端提供服务。GeeRPC 在支持并发与异步的客户端 Client 的基础上,实现了一个支持负载均衡的客户端 XClient,它包括服务发现模块 d、负载均衡模式 mode 以及通信协议选项 opt,为了尽可能地复用已经创建的客户端实例,XClient 还使用 clients 这个 map 对已经创建的实例进行保存。
XClient 的结构实现如下:
type XClient struct {
d Discovery
mode SelectMode
opt *geerpc.Option
mu sync.Mutex
clients map[string]*geerpc.Client
}
// 目的是为了确保 geerpc.Client 实现了 io.Closer 接口, 目前还需要实现 Close 方法
var _ io.Closer = (*XClient)(nil)
// NewXClient 是构造函数, 用于创建 XClient 实例
func NewXClient(d Discovery, mode SelectMode, opt *geerpc.Option) *XClient {
return &XClient{d: d, mode: mode, opt: opt, clients: make(map[string]*geerpc.Client)}
}
XClient 直接暴露给用户的两个方法是 Call 和 Broadcast,前者基于服务发现模块的调度机制选择单个服务实例执行 Client 发起的 RPC 请求,而后者向所有服务实例广播 RPC 请求。我们先后从这两个暴露在外的方法对客户端的实现进行剖析。
Call 方法:客户端基于负载均衡策略选择单个服务实例执行 RPC 调用
需要首先明确的一点是,在 XClient 底层执行客户端功能的其实是 Client,XClient 通过一个 map 保存 Client,进而复用底层 Client 的方法。
XClient 的 Call 方法的实现如下:
// Call invokes the named function, waits for it to complete,
// and returns its error status.
// xc will choose a proper server.
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
rpcAddr, err := xc.d.Get(xc.mode) // Get 即服务发现模块的 Get 方法, 根据 mode 选取一个 Server
if err != nil {
return err
}
return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}
第一行 rpcAddr, err := xc.d.Get(xc.mode)
得到的 rpcAddr 是一个 string 类型,它保存的内容就是 RPC Server 的地址。通过进一步调用 xc.call(rpcAddr, ctx, serviceMethod, args, reply)
来执行具体的请求。
在剖析 call 方法之前,我们先来研究一下 Call 的形参。ctx 是一个 context.Context
类型的变量,它的作用是为客户端引入了超时控制;serviceMethod 即客户端想要调用的具体的服务和方法,它是 string 类型,一个例子是 foo.Sum
;args 和 reply 是空接口类型,对于空接口类型的形参而言,在传入实参时可以传入任意类型的值,因此空接口很适合用来接收 RPC 调用的参数及返回值。对于 foo.Sum
方法,它的参数是一个 Args 类型,包含 Num1 和 Num2 两个成员,即两个要加在一起的数,为了尽可能地简化问题,加数都是 int 类型。reply 是一个 int 类型的指针,用于保存整型数相加的结果。在 Call 当中进一步调用 call 方法,根据 rpcAddr 执行具体的 RPC 调用。
call 方法的实现如下:
// call 方法用于向指定的 RPC 服务器地址发起调用
func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {
client, err := xc.dial(rpcAddr)
if err != nil {
return err
}
return client.Call(ctx, serviceMethod, args, reply)
}
它首先调用了 XClient 的 dial 方法,dial 方法的实现如下:
// dial 方法用于连接到指定的 RPC 服务器地址
func (xc *XClient) dial(rpcAddr string) (*geerpc.Client, error) {
xc.mu.Lock()
defer xc.mu.Unlock()
client, ok := xc.clients[rpcAddr]
if ok && !client.IsAvailable() {
_ = client.Close()
delete(xc.clients, rpcAddr)
client = nil
}
if client == nil {
var err error
client, err = geerpc.XDial(rpcAddr, xc.opt)
if err != nil {
return nil, err
}
xc.clients[rpcAddr] = client
}
return client, nil
}
我们可以看到,dial 方法的形参是 rpcAddr,返回值是一个 Client 类型的指针和 error。我刚才已经提到,为了尽可能地复用已经创建好的 Client 实例,XClient 使用一个 map 类型对 *Client
进行保存,map 的 key 是 string,对应 rpcAddr,value 就是 *Client
。剖析 dial 方法的行为,如果 map 当中保存着 Client 实例,并且这个实例没有过期,那么直接返回这个实例;否则,dial 会通过 XDial 创建一个新的 Client,命名为 client,并将其保存在 map 当中。最后将 client 返回。
XDial 比较复杂,它的功能是基于 rpcAddr 的形式,建立一个与 RPC Server 的客户端连接。XDial 支持多种协议,比如 HTTP 协议、TCP 协议、Unix 协议等。鉴于其较为复杂,我们首先研究一下 XDial 创建客户端的过程,再回到基于 XClient 完成一个客户端调用的解读。
XDial:一个较为通用的 RPC 客户端连接函数,支持多种协议
XDial 的实现如下:
// XDial calls different functions to connect to a RPC server
// according the first parameter rpcAddr.
// rpcAddr is a general format (protocol@addr) to represent a rpc server
// eg, http@10.0.0.1:7001, tcp@10.0.0.1:9999, unix@/tmp/geerpc.sock
func XDial(rpcAddr string, opts ...*Option) (*Client, error) {
// XDial 是一个通用的 RPC 客户端连接函数, 支持多种协议. 它的主要功能是根据 rpcAddr 的格式解析协议和地址,
// 并调用相应的底层连接函数来建立与 RPC 服务器的连接.
parts := strings.Split(rpcAddr, "@")
if len(parts) != 2 {
return nil, fmt.Errorf("rpc client err: wrong format '%s', expected protocol@addr", rpcAddr)
}
protocol, addr := parts[0], parts[1]
switch protocol {
case "http":
return DialHTTP("tcp", addr, opts...)
default:
return Dial(protocol, addr, opts...)
}
}
它首先根据 @
对 rpcAddr 进行了拆分,@
之前是客户端请求连接 RPC Server 的协议,而 @
之后是客户端请求连接的 RPC Server 的具体的地址。返回的就是创建的客户端实例以及错误。
XDial 将客户端的创建划分为两种情况,分别是使用 HTTP 协议的情况和使用其它协议的情况。我们来看一下具体的实现:
// DialHTTP 是一个便捷函数, 用于通过 HTTP 协议连接到 RPC 服务器, 它封装了底层的连接建立和协议切换逻辑
func DialHTTP(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(NewHTTPClient, network, address, opts...)
}
func Dial(network, address string, opts ...*Option) (*Client, error) {
// Dial: 对外暴露的客户端连接函数
// 返回: 客户端实例和错误
return dialTimeout(NewClient, network, address, opts...)
}
不难看出,无论是 Dial 还是 DialHTTP,最终都通过 dialTimeout 返回一个 Client 实例,区别在于第一个入参不同,DialHTTP 使用的是 NewHTTPClient,Dial 使用的是 NewClient。NewClient 和 NewHTTPClient 都是工厂函数,说明在 dialTimeout 当中要使用创建 Client 的工厂函数创建客户端实例。dialTimeout 的实现如下,它的作用是为客户端的创建套了一层壳,从而为客户端建立的过程引入超时处理机制,当客户端的创建超时时,需要记录超时错误:
// clientResult 封装了客户端连接的结果
type clientResult struct {
client *Client // 成功连接后返回的客户端实例
err error // 连接过程中发生的错误
}
// dialTimeout 实现了带超时的客户端连接逻辑
// f: 创建客户端实例的函数
// network: 网络类型 (比如 tcp)
// address: 服务器地址
// opts: 连接选项 (可变参数)
func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...) // 对传入的 opts 进行解析, 如果为空则使用默认的 opt
if err != nil {
return nil, err
}
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout) // 库函数调用
// 👆 基于 opt 当中的 ConnectTimeout 创建 conn
if err != nil {
return nil, err
}
// close the connection if client is nil
defer func() {
if err != nil {
_ = conn.Close()
}
}()
ch := make(chan clientResult) // 创建一个 clientResult 类型的通道, 用于接收连接结果
// IMPORTANT: 使用 channel 的目的就是结合 select 完成超时控制
go func() { // 启动 goroutine 创建客户端
client, err := f(conn, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 { // 如果 ConnectTimeout 为 0, 则等待 goroutine 完成并返回结果
result := <-ch
return result.client, result.err
}
select { // 否则, 使用 select 监听超时和结果通道
case <-time.After(opt.ConnectTimeout): // 如果超时, 返回超时错误
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch: // 如果收到结果, 返回客户端实例和错误
return result.client, result.err
}
}
dialTimeout 的形参 f 有两种可能的情况,一种是 NewHTTPClient,另一种是 NewClient。
我们之前已经提到过,GeeRPC 的服务端是支持 HTTP 协议的,即 Server 实际上实现了 ServeHTTP 这个方法,即实现了 http.Handler 接口。在 Server 的 ServeHTTP 方法当中,Server 通过 Hijack 对 HTTP 协议进行拦截,并截获底层的 TCP 连接,基于 TCP 连接,Server 将 HTTP 协议切换为了 RPC 协议。客户端要做的就是正确地向 Server 发起 CONNECT 方法的 Request,用于建立 HTTP 连接。因此 NewHTTPClient 的实现为:
// NewHTTPClient 函数用于创建一个基于 HTTP 的 RPC 客户端. 它通过已经建立的 TCP 连接 (conn) 与服务器进行 HTTP 协议
// 的握手, 并在握手成功后切换到 RPC 协议
func NewHTTPClient(conn net.Conn, opt *Option) (*Client, error) {
// 通过 io.WriteString 向服务器发送一个 HTTP CONNECT 请求, 请求路径为 defaultRPCPath
// 请求格式: CONNECT /_geerpc_ HTTP/1.0
// 这个请求是告诉服务器, 客户端希望通过 HTTP 协议建立连接, 并切换到 RPC 协议
_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", defaultRPCPath))
// Require successful HTTP response
// before switching to RPC protocal.
/*
使用 http.ReadResponse 从连接中读取服务器的 HTTP 响应.
bufio.NewReader(conn) 将 conn 包装成一个带缓冲的读取器, 以便逐行读取 HTTP 响应
http.Request{Method: "CONNECT"} 是一个虚拟的 HTTP 请求对象, 用于告诉 http.ReadResponse 这是一个 CONNECT 请求的响应
*/
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
// 连接成功, 此时调用 NewClient(conn, opt) 新建一个 TCP 客户端并返回
return NewClient(conn, opt)
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
return nil, err
}
可以看到,如何 HTTP 连接成功,那么最终仍然要创建一个 TCP 客户端并返回,通过 NewClient 创建一个 TCP 客户端。
Client:底层的客户端实例
NewClient 将会创建一个 Client 实例,而 Client 承载着一个 RPC 框架中客户端所有的基础功能。总结来说,一个 Client 的作用包括发送和接收消息,并记录一次 RPC 调用的完成状态。
Client 的结构定义及其工厂函数 NewClient 的实现如下:
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}
var _ io.Closer = (*Client)(nil) // Client 需要实现 Close 方法已满足 io.Closer 接口的要求
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
// send options with server
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}
return newClientCodec(f(conn), opt), nil
}
func newClientCodec(cc codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1,
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive()
return client
}
NewClient 当中的 newClientCodec 创建了一个 Client ,在 goroutine 中开启 Client 的 receive 方法,并返回 Client。
Call 实例:承载一次 RPC 调用
可以看到,Client 当中包含了一个名为 pending 的字段(pending 保存的是目前尚未完成的 RPC 调用),它的类型是 key 为 uint64,value 为 *Call
的 map。Call 承载着一次具体的 RPC 调用,一个 Client 可以承载多个 Call,即基于一个 RPC 客户端可以发起多次 RPC 调用。Call 的结构定义如下,它只有一个 done 方法,用于标记当前 RPC 调用已经完成:
// in client/client.go
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args interface{} // arguments to the func
Reply interface{} // reply from the function
Error error // if error oocurs, it will be set
Done chan *Call // Strobes with call is complete
}
func (call *Call) done() {
call.Done <- call
}
Call 结构的字段包括:
- Seq:用于标识当前调用的序列号;
- ServiceMethod:格式为
"<service>.<method>"
,用于保存当前调用的服务名和方法名; - Args:当前 RPC 调用传入的参数;
- Reply:保存当前 RPC 调用的返回值;
- Error:记录 RPC 调用可能出现的错误;
- Done:这个字段比较有意思,它是一个
*Call
类型的 channel,在 done 这个 Call 唯一的方法中被使用。当一次调用完成后,使用 done 可以将 call 自身发送给 Done 这个 channel,它的作用是控制 RPC 调用的并发与异步,我将在之后 Client 的方法 Go 和 Call 当中进行详细解读。
回到 Client,实现 Client 的接收功能 receive
剖析完 Call 之后,我们回到 Client。在 Client 的工厂函数当中,开启了一个执行 Client 的 receive 方法的 goroutine。receive 实现的正是客户端的接收功能,接收到的响应包括三种情况:
- call 不存在,可能的原因是 RPC 调用的请求发送不完整;
- call 存在,但服务端处理出错,故 call 的 Error 字段不为空;
- call 存在,且没有出错,即服务端正常处理,此时需要从 stream 当中读取 Reply。
根据上述分析,receive 及其相关方法的实现如下:
func (client *Client) removeCall(seq uint64) *Call {
client.mu.Lock()
defer client.mu.Unlock()
call := client.pending[seq]
delete(client.pending, seq) // 内置的 delete 的第一个参数是 map, 第二个参数是键, 可以删除 map 中的键值对
return call
}
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()
client.mu.Lock()
defer client.mu.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// err != nil 的时候
client.terminateCalls(err)
}
需要明确的一点是,client.cc 的 ReadHeader 和 ReadBody 方法都是从输入流 stream 当中进行读取,如果没有消息到来,将会阻塞。receive 接受的就是来自服务端执行一次 RPC 调用的响应,首先从 Header 的 Seq 字段当中可以获取到当前响应对应 Client 的哪次 RPC 调用请求,找到具体的 RPC 调用,即 Call。根据 Call 的情况,进行不同的处理。
如果 call 为 nil,原因是 Server 回发的 Response 中 Header 保存的 Seq 与当前处于 pending 当中的 Seq 对应不上,原因可能是请求在发送给 Server 之后被 Client 取消,但 Server 仍旧处理了这个请求并回发了 Response。
如果 Header 当中包含错误,说明 RPC 调用出错,此时需要记录错误并标记此次 call 完成。
如果 call 不为空且没有出错,那么正常对返回值进行读取并标记本次 call 完成即可。返回值将会保存在 call 的 Reply 字段当中,用于进一步的处理。
当有错误发生时,Client 的 receive 方法将停止 for loop,并终止所用当前处于 pending 的 Call,停止接收消息。
同步进度,回到 XDial 和 call 方法
现在让我们来同步一下进度。在最开始,我们从 XClient 暴露给用户的 Call 方法入手,Call 方法首先通过 SelectMode 得到 rpcAddr,再通过 call 方法建立一个与 rpcAddr 的客户端。在 call 方法当中,通过 dial 建立了 client(dial 又会进一步调用 XDial,XDial 通过 DialHTTP 或 Dial 建立 TCP Client,从而引出了底层的 Client 对象及 Call 对象),现在我们就可以通过 client 发起一次 RPC 远程调用:
// call 方法用于向指定的 RPC 服务器地址发起调用
func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {
client, err := xc.dial(rpcAddr)
if err != nil {
return err
}
return client.Call(ctx, serviceMethod, args, reply)
}
具体来说,在 call 当中使用 client.Call(ctx, serviceMethod, args, reply)
,即直接是有 Client 类型的 Call 方法完成一次 RPC 调用,传入的参数包括 ctx、ServiceMethod、args 和 reply。
Client 的 Call 方法:发起 RPC 调用并通过 Context 引入超时机制
Client 的 Call 方法实现如下:
// Call 是 RPC 客户端实现的核心部分, 用于发起远程调用并支持上下文 (context.Context) 的超时和取消功能
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1)) // 发起一个异步的远程调用
select {
// 监听上下文的取消或超时, 以及调用结果的返回
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
}
在 Call 当中,直接通过 Go 发起一个异步的远程调用。Go 的实现如下:
// Go invokes the functions asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(ServiceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10) // buffered channel
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: ServiceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}
在 Go 当中,构建了一个 Call 实例 call,刚才我们已经提到,它承载了一次 RPC 调用,构建完成后,通过 Client 的 send 方法将 call 实例发送给 Server。send 及其相关方法的实现如下:
func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing || client.shutdown {
return 0, ErrShutdown
}
call.Seq = client.seq
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()
// register this call
seq, err := client.registerCall(call)
if err != nil {
call.Error = err
call.done()
return
}
// prepare request header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""
// encode and send the request
if err = client.cc.Write(&client.header, call.Args); err != nil {
call = client.removeCall(seq)
if call != nil {
call.Error = err
call.done()
}
}
}
send 方法不难理解,它所做的就是构建了一个 Request,Header 当中包含的是 ServiceMethod、Seq 以及空的 Error,Body 包含的就是本次 RPC 调用的参数,将其通过 TCP Connection 发送给 Server,再通过 receive 等待 Server 回发的 Response 即可。
回到 Client 的 Call 方法,Go 的返回值是一个 Call 实例 call,Call 方法在得到 call 之后,通过 channel + select 实现超时机制,在此处我们就可以看到之前所说的 Call 对象当中 Done 这个 channel 的作用:
select {
// 监听上下文的取消或超时, 以及调用结果的返回
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
如果 ctx 的 Done 先到达,代表超时;而如果 call 的 Done 先到达,代表本次 RPC 调用完成,返回 call 当中的错误即可。
至此,我们完整过了一遍 GeeRPC 当中 XClient 对外暴露的 Call 方法的工作流程,接下来我们来研究一下另一个对外暴露的 Broadcast 接口。
Broadcast 方法:客户端向所有可以用的 Server 进行 RPC 调用广播
XClient 另一个对外暴露的接口是 Broadcast。Broadcast 向所有 Server 广播 RPC 调用请求,如果其中一个实例发生错误,则返回错误,如果其中一个实例调用成功,则返回调用的结果。
Broadcast 方法的实现如下:
// Broadcast invokes the named function for every server registered in discovery
// Broadcast 方法用于向所有服务器广播调用
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface{}) error {
servers, err := xc.d.GetAll() // 获得所有服务器地址
if err != nil {
return err
}
var wg sync.WaitGroup
var mu sync.Mutex
var e error
replyDone := reply == nil
ctx, cancel := context.WithCancel(ctx)
for _, rpcAddr := range servers {
wg.Add(1)
go func(rpcAddr string) {
defer wg.Done()
var clonedReply interface{}
if reply != nil {
clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
}
err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)
mu.Lock()
if err != nil && e == nil {
e = err
cancel()
}
if err == nil && !replyDone {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())
replyDone = true
}
mu.Unlock()
}(rpcAddr)
}
wg.Wait()
return e
}
可以看到 Broadcast 方法的实现非常地简洁清晰,由于我们刚才已经对 Call 方法的实现进行了完整的解读,所以此处的 Broadcast 方法的实现细节不再赘述,它本质上就是借助服务发现模块的 GetAll 方法获取所有当前可用的服务实例,对每一个服务实例进行遍历,在每一个 Server 上发起 RPC 调用之前,通过反射复制一份 Reply,使得每一个传入的 Reply 都是传址调用。
需要注意的两个点是:
- Broadcast 方法借助 context.WithCancel 确保有错误发生时,快速失败;
- Broadcast 方法借助 WaitGroup 和 Mutex 进行并发控制,WaitGroup 用于追踪每一个承载 RPC 调用的 goroutine,Mutex 保护资源调用。
至此,我们完成了对 GeeRPC 高性能客户端的回顾。
Part5:从 main 函数出发完整地体验一次 GeeRPC 的使用
接下来到了实践环节,我们借助 Geektutu 在 Day7 给出的 Demo,完整地体验一次 GeeRPC 框架。
完整的项目目录如下:
main.go
的实现如下:
package main
import (
"Geektutu/GeeRPC/geerpc"
"Geektutu/GeeRPC/geerpc/registry"
"Geektutu/GeeRPC/geerpc/xclient"
"context"
"log"
"net"
"net/http"
"sync"
"time"
)
type Foo int
type Args struct{ Num1, Num2 int }
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
func (f Foo) Sleep(args Args, reply *int) error {
time.Sleep(time.Second * time.Duration(args.Num1))
*reply = args.Num1 + args.Num2
return nil
}
func startRegistry(wg *sync.WaitGroup) {
l, _ := net.Listen("tcp", ":9999")
registry.HandleHTTP()
wg.Done()
_ = http.Serve(l, nil)
}
func startServer(registryAddr string, wg *sync.WaitGroup) {
var foo Foo
l, _ := net.Listen("tcp", ":0")
server := geerpc.NewServer()
_ = server.Register(&foo)
registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)
wg.Done()
server.Accept(l)
}
func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *Args) {
var reply int
var err error
switch typ {
case "call":
err = xc.Call(ctx, serviceMethod, args, &reply)
case "broadcast":
err = xc.Broadcast(ctx, serviceMethod, args, &reply)
}
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)
}
}
func call(registry string) {
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}
func broadcast(registry string) {
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "broadcast", "Foo.Sum", &Args{Num1: i, Num2: i * i})
// expect 2 - 5 timeout
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
foo(xc, ctx, "broadcast", "Foo.Sleep", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}
func main() {
log.SetFlags(0)
registryAddr := "http://localhost:9999/_geerpc_/registry"
var wg sync.WaitGroup
wg.Add(1)
go startRegistry(&wg)
wg.Wait()
time.Sleep(time.Second)
wg.Add(2)
go startServer(registryAddr, &wg)
go startServer(registryAddr, &wg)
wg.Wait()
time.Sleep(time.Second)
call(registryAddr)
broadcast(registryAddr)
}
我们依然从 main 函数体出发。
首先,通过指定 registryAddr := "http://localhost:9999/_geerpc_/registry"
,确定了 GeeRPC 的 GeeRegistry 的服务地址。
之后,通过开启一个 startRegistry 的 goroutine 启动 GeeRegistry。由于 GeeRegistry 通过 HTTP 提供服务,因此需要将 GeeRegistry 的启动放在一个 goroutine 当中,这样 http.Serve
方法才会并发地监听请求。
然后。通过 startServer 开启两个 RPC Server,startServer 的实现如下:
func startServer(registryAddr string, wg *sync.WaitGroup) {
var foo Foo
l, _ := net.Listen("tcp", ":0")
server := geerpc.NewServer()
_ = server.Register(&foo)
registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)
wg.Done()
server.Accept(l)
}
在 net.Listen
方法中,如果指定端口号为 0,那么系统将随机分配一个可用的端口号,作为 Server 提供服务的端口。通过 Registry 将名为 Foo 的服务注册到 Server 当中,Foo 提供两种方法,分别是 Sum 和 Sleep。注册服务之后,通过 Heartbeat 向 GeeRegistry 发送心跳,将 Server 注册到 Registry 当中。最后通过 Server 的 Accept 方法接收 Listener,开启 goroutine 在指定的端口进行服务监听。
最后,客户端的服务调用封装在了 call 和 broadcast 函数当中。我们仅以 call 为例,对 call 进行剖析,broadcast 与 call 的工作机制基本相同。call 的实现如下:
func call(registry string) {
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}
在 call 中,我们要做的是使用 XClient 对外暴露的 Call 方法实现一次 RPC 调用。因此 call 函数的前两行依次创建了服务发现模块并使用服务发现模块创建 XClient 对象。之后通过 foo 函数进行远程调用,foo 的实现如下:
func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *Args) {
var reply int
var err error
switch typ {
case "call":
err = xc.Call(ctx, serviceMethod, args, &reply)
case "broadcast":
err = xc.Broadcast(ctx, serviceMethod, args, &reply)
}
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)
}
}
如果 switch case 为 call,那么 foo 函数将调用 XClient 的 Call 方法进行远程调用。
现在让我们在 Demo 的基础上,构建一个更加复杂的服务,并注册到服务中心。
我们首先定义一个名为 YGGP 的服务,并为其注册名为 LengthOfLongestSubstring 的方法,它对应的是 LeetCode 3. 无重复字符的最长子串的解决方案。同时定义 YGGPArgs 保存输入的参数。
type YGGP struct{}
type YGGPArgs struct {
Str string
}
func (yggp YGGP) LengthOfLongestSubstring(args YGGPArgs, reply *int) error {
s := args.Str
var slow, fast, length int
mp := make(map[byte]int)
slow, fast, length = 0, 0, len(s)
for fast < length {
mp[s[fast]]++
for mp[s[fast]] > 1 && slow < fast {
mp[s[slow]]--
slow++
}
fast++
*reply = max(*reply, fast-slow)
}
return nil
}
之后我们新建一个服务实例并注册 YGGP 服务:
func startYGGPServer(registryAddr string, wg *sync.WaitGroup) {
var yggp YGGP
l, _ := net.Listen("tcp", ":0")
server := geerpc.NewServer()
_ = server.Register(&yggp)
registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)
wg.Done()
server.Accept(l)
}
然后我们新建一个 YGGP 的 XClient,进行 LengthOfLongestSubstring 方法的 RPC 调用:
func yggp(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *YGGPArgs) {
var reply int
var err error
switch typ {
case "call":
err = xc.Call(ctx, serviceMethod, args, &reply)
case "broadcast":
err = xc.Broadcast(ctx, serviceMethod, args, &reply)
}
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s %s success: LengthOfLongestSubstring(%s) is %d", typ, serviceMethod, args.Str, reply)
}
}
func yggpcall(registry string) {
TestStr := []string{"abcabcbb", "abcabcbbabcabcbb", "bbbbb", "pwwkew", "pwwkewpwwkew"}
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
yggp(xc, context.Background(), "call", "YGGP.LengthOfLongestSubstring", &YGGPArgs{
Str: TestStr[i],
})
}(i)
}
wg.Wait()
}
最后我们在 main 函数中使用我们刚才注册的方法:
func main() {
log.SetFlags(0)
registryAddr := "http://localhost:9999/_geerpc_/registry"
var wg sync.WaitGroup
wg.Add(1)
go startRegistry(&wg)
wg.Wait()
time.Sleep(time.Second)
wg.Add(1)
//go startServer(registryAddr, &wg)
//go startServer(registryAddr, &wg)
go startYGGPServer(registryAddr, &wg)
wg.Wait()
time.Sleep(time.Second)
yggpcall(registryAddr)
//call(registryAddr)
//broadcast(registryAddr)
}
运行结果如下:
rpc registry path: /_geerpc_/registry
rpc server: register YGGP.LengthOfLongestSubstring
tcp@[::]:53762 send heart beat to registry http://localhost:9999/_geerpc_/registry
rpc registry: refresh servers from registry http://localhost:9999/_geerpc_/registry
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(abcabcbbabcabcbb) is 3
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(abcabcbb) is 3
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(pwwkewpwwkew) is 4
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(bbbbb) is 1
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(pwwkew) is 3
可以看到,我们注册的方法可以得到正确的结果,实验成功。
至此,我前前后后花了三天的实践,完整地总结了 GeeRPC 项目。