当前位置: 首页 > article >正文

GeeRPC第一天 服务端与消息编码(1)

RPC

1. 系统架构图解释(Graph)

image.png

架构层次
  • RPC框架核心功能:这是系统的最上层,涵盖了框架的主要功能模块,直接与底层服务和用户交互。
    • 服务层:主要负责服务的注册、发现和治理。
      • 服务注册:将服务信息注册到注册中心,供客户端发现。
      • 服务发现:客户端从注册中心获取服务的列表,并与之建立连接。
      • 服务治理:包括负载均衡、容错、限流等机制,确保系统的稳定性。
    • 协议层:负责与客户端和服务端之间的数据交换协议,包括序列化、压缩和协议设计。
      • 协议设计:定义请求/响应格式、消息交换规则等。
      • 序列化:选择并实现数据的编码/解码机制(如 Gob、JSON)。
      • 压缩:支持数据压缩以提高传输效率,减少带宽消耗。
    • 网络层:负责连接管理、通信模型和传输协议的处理。
      • 连接管理:管理客户端和服务端之间的连接池,确保连接的高效复用。
      • 通信模型:包括同步和异步通信模型的选择。
      • 传输协议:决定底层使用的传输协议(如 TCP、HTTP)。
    • 注册中心:提供服务注册、配置管理和健康检查功能。
      • 服务注册中心:为所有服务提供统一的注册管理。
      • 配置中心:集中管理配置信息。
      • 健康检查:确保所有注册的服务都在正常工作,检测服务的健康状态。
    • 可靠性保证:提供负载均衡、容错重试和熔断限流等机制,确保系统的稳定性和容错性。
      • 负载均衡:将请求均匀分配到多个服务实例,避免单个服务压力过大。
      • 容错重试:出现异常时,自动进行重试或回退操作。
      • 熔断限流:当服务压力过大或出现异常时,进行熔断或限流,避免雪崩效应。
子图解释
  • 服务管理子图:描述了服务层的内部管理,包括:

    • 方法注册:将服务方法注册到服务端。
    • 服务发现机制:客户端如何通过注册中心发现服务节点。
    • 服务监控/统计:监控服务的健康状态,记录和分析请求统计信息。
  • 传输协议子图:描述协议层的实现细节:

    • 消息格式:确定消息的基本格式(如 JSON 或二进制格式)。
    • 多种序列化支持:支持多种不同的序列化方式(如 JSON、Gob、Protobuf 等)。
    • 数据压缩:在传输过程中,数据可能被压缩,以提高效率。
  • 网络处理子图:描述了网络层的关键组件:

    • 连接池:管理与服务端的连接,避免频繁建立/销毁连接。
    • 异步/同步:支持同步和异步通信模型,提供更灵活的处理方式。
    • TCP/HTTP:选择适当的传输协议,支持不同类型的网络通信。

2. 序列图解释(Sequence Diagram)

image.png

启动阶段
  • 服务注册:服务端在启动时将自己注册到注册中心(如 ZooKeeper、Consul 等)。
  • 注册确认:注册中心确认服务的注册,确保服务信息正确并在系统中可用。
服务发现阶段
  • 客户端请求服务列表:客户端向注册中心请求可用的服务列表。
  • 返回服务节点:注册中心返回可用的服务节点,客户端可根据返回的数据选择合适的服务进行调用。
调用阶段
  • 客户端发起调用:客户端通过代理(如 client proxy)发起远程调用。
    • 找到服务:客户端代理根据服务名称查找目标服务的地址。
    • 负载均衡:根据配置的负载均衡策略(如轮询、加权、随机等)选择一个服务实例进行调用。
    • 序列化请求:将请求数据序列化成字节流(如 Gob、JSON 格式)。
    • 发送请求:将序列化后的请求发送给服务端代理。
  • 服务端处理请求
    • 反序列化请求:服务端代理接收到请求后进行反序列化,将请求数据还原为 Go 对象。
    • 调用服务方法:服务端调用对应的业务逻辑方法。
    • 返回结果:服务端将方法执行的结果返回给服务端代理。
    • 序列化响应:服务端代理将响应数据进行序列化,准备返回客户端。
    • 发送响应:将序列化后的响应发送给客户端代理。
  • 客户端接收响应
    • 反序列化响应:客户端代理接收到响应后,反序列化数据并还原为 Go 对象。
    • 返回结果:客户端将最终的结果返回给调用方。
异常处理
  • 超时重试:当服务请求超时或失败时,客户端代理会尝试重试请求。
  • 熔断处理:服务端代理会根据熔断机制(如 Hystrix)判断服务是否正常,若服务异常,则停止请求并返回默认值,避免系统崩溃。

RPC 消息格式设计详解

消息格式设计:
+--------------------+----------------+-----------------+
|    消息头部        |    消息体      |      附加信息    |
+--------------------+----------------+-----------------+
包含内容:
- 魔数 (Magic Number)
- 版本号
- 序列化方式
- 消息 ID
- 消息类型
- 消息长度
## 1. 消息格式整体设计

+--------+----------+------------+-------------+-------------+-------------+
|  Magic |  Version | Serializer | MessageType |  RequestID  |    Length   |  消息头(16字节)
+--------+----------+------------+-------------+-------------+-------------+
|                         Message  Body                                   |  消息体
+---------------------------------------------------------------------+
|                         Extension Data                                 |  附加信息
+---------------------------------------------------------------------+

总长度 = 16字节(头部)+ Body长度 + 扩展数据长度

2. 字段详细说明

2.1 消息头 (16字节固定长度)

  1. Magic Number (2字节)

    • 值: 0x3bef
    • 作用: 用于验证消息的有效性
    • 示例: [0x3b, 0xef]
  2. Version (1字节)

    • 作用: 协议版本号
    • 范围: 1-255
    • 示例: [0x01]
  3. Serializer (1字节)

    0x01: JSON
    0x02: Protobuf
    0x03: Hessian
    0x04: Kryo
    
  4. MessageType (1字节)

    0x01: 请求消息
    0x02: 响应消息
    0x03: 心跳请求
    0x04: 心跳响应
    
    
  5. RequestID (8字节)

    • 作用: 唯一标识一次请求
    • 格式: 长整型数字
    • 示例: [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01]
  6. Length (3字节)

    • 作用: 消息体长度
    • 范围: 0-16777215 (2^24 - 1)
    • 示例: [0x00, 0x00, 0x20] 表示32字节

2.2 消息体 (变长)

请求消息体格式:

{
    "interfaceName": "com.example.UserService",
    "methodName": "getUserById",
    "parameterTypes": ["java.lang.Long"],
    "parameters": [1001],
    "version": "1.0"
}

响应消息体格式:

{
    "code": 200,
    "message": "Success",
    "data": {
        "id": 1001,
        "name": "John Doe",
        "age": 25
    }
}
2.3 附加信息 (可选)
{
    "traceId": "abc123xyz789",
    "timestamp": 1637472000000,
    "attachments": {
        "timeout": 3000,
        "token": "xxx-yyy-zzz"
    }
}

3. 示例消息

3.1 请求消息示例
// 消息头 (16字节)
3B EF            // Magic Number (0x3bef)
01               // Version (1)
01               // Serializer (JSON)
01               // MessageType (请求消息)
00 00 00 00 00 00 00 01  // RequestID (1)
00 00 A2        // Length (162字节)

// 消息体 (JSON格式)
{
    "interfaceName": "com.example.UserService",
    "methodName": "getUserById",
    "parameterTypes": ["java.lang.Long"],
    "parameters": [1001],
    "version": "1.0"
}

// 附加信息
{
    "traceId": "abc123",
    "timestamp": 1637472000000
}
3.2 响应消息示例
// 消息头 (16字节)
3B EF            // Magic Number
01               // Version
01               // Serializer (JSON)
02               // MessageType (响应消息)
00 00 00 00 00 00 00 01  // RequestID (1)
00 00 8B        // Length (139字节)

// 消息体
{
    "code": 200,
    "message": "Success",
    "data": {
        "id": 1001,
        "name": "John Doe",
        "age": 25
    }
}

// 附加信息
{
    "traceId": "abc123",
    "responseTime": 15
}
Client Encoder Network Decoder Server 准备请求数据 传入请求数据 编码过程 1. 计算总长度 2. 写入魔数 3. 写入版本号 4. 写入序列化类型 5. 写入消息类型 6. 写入请求ID 7. 写入消息长度 8. 序列化消息体 9. 写入扩展信息 发送字节流 接收字节流 解码过程 1. 验证魔数 2. 检查版本 3. 获取序列化类型 4. 获取消息类型 5. 读取请求ID 6. 读取消息长度 7. 反序列化消息体 8. 解析扩展信息 返回解码后的消息 Client Encoder Network Decoder Server

消息的序列化与反序列化

一个典型的 RPC 调用如下:

err = client.Call("Arith.Multiply", args, &reply)

客户端发送的请求包括服务名 Arith,方法名 Multiply,参数 args 三个,服务端的响应包括错误 error,返回值 reply 两个。我们将请求和响应中的参数和返回值抽象为 body,剩余的信息放在 header 中,那么就可以抽象出数据结构 Header

import "io"

type Header struct {
    ServiceMethod string
    Seq           uint64
    Error         string
}
  • ServiceMethod 是服务名和方法名,通常与 Go 语言中的结构体和方法相映射。
  • Seq 是请求的序号,也可以认为是某个请求的 ID,用来区分不同的请求。
  • Error 是错误信息,客户端置为空,服务端如果发生错误,将错误信息置于 Error 中。

进一步,抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例:

type Codec interface {
    io.Closer
    ReadHeader(*Header) error
    ReadBody(interface{}) error
    Write(*Header, interface{}) error
}

抽象出 Codec 的构造函数,客户端和服务端可以通过 CodecType 得到构造函数,从而创建 Codec 实例,这部分代码和工厂模式类似,与工厂模式不同的是返回的是构造函数。

type NewCodecFunc func(io.ReadWriteCloser) Codec

type Type string

const (
    GobType  Type = "application/gob"
    JsonType Type = "application/json"
)

var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
    NewCodecFuncMap = make(map[Type]NewCodecFunc)
    NewCodecFuncMap[GobType] = NewGobCodec
}

我们定义了两种 CodecGobJson,但是实际代码中只实现了 Gob 一种,事实上,两者的实现非常接近,甚至只需要把 gob 换成 json 即可。

首先定义 GobCodec 结构体,这个结构体由四部分构成,conn 是由构建函数传入,通常是通过 TCP 或者 Unix 建立 socket 时得到的连接实例,decenc 对应 gob 的 DecoderEncoderbuf 是为了防止阻塞而创建的带缓冲的 Writer,一般这么做能提升性能。

import (
    "bufio"
    "encoding/gob"
    "io"
    "log"
)

type GobCodec struct {
    conn io.ReadWriteCloser
    buf  *bufio.Writer
    dec  *gob.Decoder
    enc  *gob.Encoder
}

var _ Codec = (*GobCodec)(nil)

func NewGobCodec(conn io.ReadWriteCloser) Codec {
    buf := bufio.NewWriter(conn)
    return &GobCodec{
        conn: conn,
        buf:  buf,
        dec:  gob.NewDecoder(conn),
        enc:  gob.NewEncoder(buf),
    }
}

接着实现 ReadHeaderReadBodyWriteClose 方法。

func (c *GobCodec) ReadHeader(h *Header) error {
    return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body interface{}) error {
    return c.dec.Decode(body)
}

func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
    defer func() {
        _ = c.buf.Flush()
        if err != nil {
            _ = c.Close()
        }
    }()
    if err := c.enc.Encode(h); err != nil {
        log.Println("rpc codec: gob error encoding header:", err)
        return err
    }
    if err := c.enc.Encode(body); err != nil {
        log.Println("rpc codec: gob error encoding body:", err)
        return err
    }
    return nil
}

func (c *GobCodec) Close() error {
    return c.conn.Close()
}

通信过程

客户端与服务端的通信需要协商一些内容,例如 HTTP 报文,分为 header 和 body 两部分,body 的格式和长度通过 header 中的 Content-TypeContent-Length 指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。对于 RPC 协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第 1 个字节用来表示序列化方式,第 2 个字节表示压缩方式,第 3-6 字节表示 header 的长度,7-10 字节表示 body 的长度。

对于 GeeRPC 来说,目前需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息放到结构体 Option 中承载。目前,已经进入到服务端的实现阶段了。

const MagicNumber = 0x3bef5c

type Option struct {
    MagicNumber int
    CodecType   codec.Type
}

var DefaultOption = &Option{
    MagicNumber: MagicNumber,
    CodecType:   codec.GobType,
}

一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodecType 指定,服务端首先使用 JSON 解码 Option,然后通过 OptionCodecType 解码剩余的内容。即报文将以这样的形式发送:

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod …} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodecType 决定 ------>||
在一次连接中,Option 固定在报文的最开始,HeaderBody 可以有多个,即报文可能是这样的。
1| Option | Header1 | Body1 | Header2 | Body2 | …

服务端实现

type Server struct{}

func NewServer() *Server {
    return &Server{}
}

var DefaultServer = NewServer()

func (server *Server) Accept(lis net.Listener) {
    for {
        conn, err := lis.Accept()
        if err != nil {
            log.Println("rpc server: accept error:", err)
            return
        }
        go server.ServeConn(conn)
    }
}

func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
  • 首先定义了结构体 Server,没有任何的成员字段。
  • 实现了 Accept 方法,net.Listener 作为参数,for 循环等待 socket 连接建立,并开启子协程处理,处理过程交给了 ServeConn 方法。
  • DefaultServer 是一个默认的 Server 实例,主要为了用户使用方便。

如果想启动服务,过程是非常简单的,传入 listener 即可,TCP 协议和 Unix 协议都支持。

lis, _ := net.Listen("tcp", ":9999")
geerpc.Accept(lis)

ServeConn 的实现就和之前讨论的通信过程紧密相关了,首先使用 json.NewDecoder 反序列化得到 Option 实例,检查 MagicNumberCodecType 的值是否正确。然后根据 CodecType 得到对应的消息编解码器,接下来的处理交给 serveCodec

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
    defer func() { _ = conn.Close() }()
    var opt Option
    if err := json.NewDecoder(conn).Decode(&opt); err != nil {
        log.Println("rpc server: options error: ", err)
        return
    }
    if opt.MagicNumber != MagicNumber {
        log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
        return
    }
    f := codec.NewCodecFuncMap[opt.CodecType]
    if f == nil {
        log.Printf("rpc server: invalid codec type %s", opt.CodecType)
        return
    }
    server.serveCodec(f(conn))
}

// invalidRequest is a placeholder for response argv when error occurs
var invalidRequest = struct{}{}

func (server *Server) serveCodec(cc codec.Codec) {
    sending := new(sync.Mutex) // make sure to send a complete response
    wg := new(sync.WaitGroup)  // wait until all request are handled
    for {
        req, err := server.readRequest(cc)
        if err != nil {
            if req == nil {
                break // it's not possible to recover, so close the connection
            }
            req.h.Error = err.Error()
            server.sendResponse(cc, req.h, invalidRequest, sending)
            continue
        }
        wg.Add(1)
        go server.handleRequest(cc, req, sending, wg)
    }
    wg.Wait()
    _ = cc.Close()
}

serveCodec 的过程主要包含三个阶段:

  • 读取请求 readRequest
  • 处理请求 handleRequest
  • 回复请求 sendResponse

在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 循环无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等)。需要注意的点有三个:

  • handleRequest 使用了协程并发执行请求。
  • 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
  • 尽力而为,只有在 header 解析失败时,才终止循环。
// request stores all information of a call
type request struct {
    h            *codec.Header // header of request
    argv, replyv reflect.Value // argv and replyv of request
}

func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
    var h codec.Header
    if err := cc.ReadHeader(&h); err != nil {
        if err != io.EOF && err != io.ErrUnexpectedEOF {
            log.Println("rpc server: read header error:", err)
        }
        return nil, err
    }
    return &h, nil
}

func (server *Server) readRequest(cc codec.Codec) (*request, error) {
    h, err := server.readRequestHeader(cc)
    if err != nil {
        return nil, err
    }
    req := &request{h: h}
    // TODO: now we don't know the type of request argv
    // day 1, just suppose it's string
    req.argv = reflect.New(reflect.TypeOf(""))
    if err = cc.ReadBody(req.argv.Interface()); err != nil {
        log.Println("rpc server: read argv err:", err)
    }
    return req, nil
}

func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
    sending.Lock()
    defer sending.Unlock()
    if err := cc.Write(h, body); err != nil {
        log.Println("rpc server: write response error:", err)
    }
}

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
    // TODO, should call registered rpc methods to get the right replyv
    // day 1, just print argv and send a hello message
    defer wg.Done()
    log.Println(req.h, req.argv.Elem())
    req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
    server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}

目前还不能判断 body 的类型,因此在 readRequesthandleRequest 中,暂时将 body 作为字符串处理。接收到请求,打印 header,并回复 geerpc resp ${req.h.Seq}。这一部分后续再实现。

总结

工作流程

Client Server Codec Server.Accept starts listening Connect to server New goroutine for connection Connection Setup Phase Send Option (MagicNumber + CodecType) Validate MagicNumber Create Codec instance Request Handling Phase Send Request Header ReadHeader Send Request Body ReadBody handleRequest (new goroutine) Write Response Header & Body Send Response Error Handling Use sync.Mutex for response Use WaitGroup for requests Client Server Codec

这段代码实现了一个简单的 RPC (Remote Procedure Call) 框架,涵盖了 编解码器 (Codec) 相关功能以及 RPC 服务器 的基本处理逻辑。以下是对代码的总结,详细描述了各部分的功能:

codec

codec 包实现了 RPC 请求/响应的编码和解码功能,支持不同的编码方式。当前实现了 Gob 编码,但也为将来支持其他编码格式(如 JSON)预留了扩展接口。

  • Header 结构体:表示 RPC 请求和响应的头部信息,包含以下字段:
    • ServiceMethod: 服务方法名,格式为 Service.Method
    • Seq: 客户端选择的序列号,用于标识请求。
    • Error: 错误信息,如果有的话。
  • Codec 接口:定义了编解码器必须实现的接口方法:
    • ReadHeader(*Header) error: 读取请求头。
    • ReadBody(interface{}) error: 读取请求体。
    • Write(*Header, interface{}) error: 写入响应数据(包括头部和正文)。
    • Close() error: 关闭连接。
  • GobCodec 结构体:实现了 Codec 接口,用于处理 Gob 编码和解码。它包括:
    • conn: 用于数据读写的连接。
    • buf: 用于缓冲数据的 Writer
    • dec: gob.Decoder,用于解码数据。
    • enc: gob.Encoder,用于编码数据。
  • NewGobCodec:构造一个新的 GobCodec 实例,用于通过 Gob 编码和解码数据。

geerpc

geerpc 包实现了 RPC 服务器端的核心逻辑,负责接收和处理客户端请求,执行相应的服务,并通过网络返回响应。

  • Option 结构体:包含 RPC 请求的配置选项,主要包括:
    • MagicNumber: 用于验证请求是否合法的魔术数字。
    • CodecType: 编码方式,客户端可以选择不同的编码方式(如 Gob 或 JSON)。
  • Server 结构体:表示一个 RPC 服务器,负责处理客户端请求并返回响应。
  • NewServer:返回一个新的 RPC 服务器实例。
  • ServeConn:处理与客户端的单一连接,步骤包括:
    1. 读取并解析连接中的配置(如 MagicNumberCodecType)。
    2. 根据配置选择合适的编解码器(当前实现为 Gob 编码)。
    3. 调用 serveCodec 来处理请求。
  • serveCodec:处理每一个请求,步骤包括:
    1. 使用 readRequest 读取请求头部和正文(当前假设请求参数为字符串类型)。
    2. 使用 handleRequest 异步处理请求(目前只是打印请求参数并返回一个简单的响应)。
    3. 使用 sendResponse 发送响应数据。
  • readRequestHeader:读取请求的头部信息。
  • readRequest:读取请求的头部和正文,并解析请求参数。
  • sendResponse:发送响应数据。
  • handleRequest:处理 RPC 请求,当前仅返回一个固定的响应。
  • Accept:接受传入的连接并启动 ServeConn 来处理每个连接。
  • DefaultServer:默认的 RPC 服务器实例,用于处理请求。

gob

gob 是 Go 语言内置的序列化机制,用于在编码和解码 Go 数据结构时实现高效的二进制格式。相比于 JSON 或 XML 等文本格式,gob 编码后的数据体积更小,性能更高,适合在网络传输或本地存储中使用。

优点
  • 高效性gob 使用二进制格式,序列化和反序列化的速度更快。
  • 紧凑性:编码后的数据占用空间更小,减少了网络带宽的消耗。
  • 类型安全:支持 Go 的各种数据类型,包括复杂的结构体、切片、映射等。
使用场景
  • RPC 通信:在 Go 的 RPC 包中,默认使用 gob 进行数据的序列化和反序列化。
  • 网络传输:在客户端和服务器之间传递数据时,使用 gob 可以提高传输效率。
  • 本地存储:将数据序列化后存储到文件中,供以后读取和使用。
示例代码
package main

import (
    "bytes"
    "encoding/gob"
    "fmt"
)

type Person struct {
    Name string
    Age  int
}

func main() {
    var network bytes.Buffer // 模拟网络连接

    // 创建编码器和解码器
    encoder := gob.NewEncoder(&network)
    decoder := gob.NewDecoder(&network)

    // 需要编码的数据
    alice := Person{Name: "Alice", Age: 30}

    // 编码数据
    err := encoder.Encode(alice)
    if err != nil {
        fmt.Println("编码错误:", err)
    }

    // 解码数据
    var decodedPerson Person
    err = decoder.Decode(&decodedPerson)
    if err != nil {
        fmt.Println("解码错误:", err)
    }

    fmt.Println("解码结果:", decodedPerson)
}

在 GeeRPC 中的应用

在 GeeRPC 中,gob 编码用于序列化和反序列化 RPC 请求和响应的数据,GobCodec 实现了 Codec 接口,用于处理 gob 格式的编解码。


Option

Option 是在 RPC 系统中用于传递配置选项的结构体。它在代码中主要用于设置一些基础配置,如 MagicNumberCodecType,用于验证和选择编解码方式。

代码中 Option 结构体的定义

type Option struct {
    MagicNumber int        // MagicNumber 表示这是一个 GeeRPC 请求
    CodecType   codec.Type // 客户端可以选择不同的编解码器来编码数据
}

字段说明

  1. MagicNumber

    • 作用:用于标识 RPC 请求的合法性和有效性。
    • 原理:服务器通过检查请求中的 MagicNumber,可以判断该请求是否来自一个合法的客户端,防止非法或误导的请求被处理。
    • 常量值:通常被设置为一个固定的常量,例如 0x3bef5c,这样服务器可以通过匹配该值来验证请求。
    • 安全性:通过使用 MagicNumber,可以避免服务器处理非 GeeRPC 客户端发送的请求,增强系统的安全性和稳定性。
  2. CodecType

    • 作用:指定编解码器的类型,客户端可以选择不同的编码方式来序列化数据。
    • 可选值codec.GobTypecodec.JsonType 等。当前代码中实现了 Gob 编码,Json 编码可以根据需要扩展。
    • 灵活性:通过 CodecType,服务器可以根据客户端的选择,创建相应的编解码器实例,支持多种编码格式。

在 RPC 系统中的作用

  • 协议协商Option 结构体在连接建立时由客户端发送到服务器,用于协商通信协议的版本、编码方式等参数。
  • 兼容性:通过 Option,可以支持不同版本的客户端和服务器之间的兼容性,确保双方使用相同的协议进行通信。
  • 扩展性:未来如果需要增加新的功能或配置选项,只需要在 Option 结构体中添加新的字段即可。

示例

const MagicNumber = 0x3bef5c

var DefaultOption = &Option{
    MagicNumber: MagicNumber,   // 设置 MagicNumber 的默认值
    CodecType:   codec.GobType, // 设置默认的编解码器类型为 Gob
}

在服务器端,接收到连接后,会首先读取 Option,并进行以下操作:

  1. 验证 MagicNumber:确保请求来自 GeeRPC 客户端。
  2. 选择编解码器:根据 CodecType,选择相应的编解码器构造函数。
  3. 创建编解码器实例:使用构造函数创建编解码器,用于后续的请求处理。

代码示例

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
    defer conn.Close()
    var opt Option
    if err := json.NewDecoder(conn).Decode(&opt); err != nil {
        log.Println("rpc server: options error: ", err)
        return
    }
    if opt.MagicNumber != MagicNumber {
        log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
        return
    }
    codecFunc := codec.NewCodecFuncMap[opt.CodecType]
    if codecFunc == nil {
        log.Printf("rpc server: invalid codec type %s", opt.CodecType)
        return
    }
    server.serveCodec(codecFunc(conn))
}

工厂模式

在 Go 语言中,工厂模式是一种创建对象的设计模式,通过将对象的创建过程与使用过程分离,提高代码的灵活性和可维护性。在 GeeRPC 中,使用工厂模式来创建编解码器实例,支持多种编码方式。

简单工厂模式

简单工厂模式通过一个工厂函数,根据传入的参数决定创建何种类型的对象。

示例代码
package main

import "fmt"

// 动物接口
type Animal interface {
    Speak() string
}

// 狗的实现
type Dog struct{}

func (d Dog) Speak() string {
    return "Woof!"
}

// 猫的实现
type Cat struct{}

func (c Cat) Speak() string {
    return "Meow!"
}

// 工厂函数
func NewAnimal(animalType string) Animal {
    switch animalType {
    case "dog":
        return Dog{}
    case "cat":
        return Cat{}
    default:
        return nil
    }
}

func main() {
    dog := NewAnimal("dog")
    fmt.Println(dog.Speak()) // 输出: Woof!

    cat := NewAnimal("cat")
    fmt.Println(cat.Speak()) // 输出: Meow!
}
特点
  • 简单易用:通过一个工厂函数创建对象,调用方便。
  • 集中管理:对象的创建逻辑集中在工厂函数中,代码清晰。
缺点
  • 不易扩展:如果需要添加新的产品类型,必须修改工厂函数的代码,违背了开闭原则。
  • 职责过重:工厂函数可能因为需要处理的类型过多而变得臃肿。

工厂方法模式

工厂方法模式为每种产品类型提供一个独立的工厂,使用接口定义工厂的创建方法,支持扩展性。

示例代码
package main

import "fmt"

// 动物接口
type Animal interface {
    Speak() string
}

// 狗的实现
type Dog struct{}

func (d Dog) Speak() string {
    return "Woof!"
}

// 猫的实现
type Cat struct{}

func (c Cat) Speak() string {
    return "Meow!"
}

// 工厂接口
type AnimalFactory interface {
    CreateAnimal() Animal
}

// 狗工厂
type DogFactory struct{}

func (df DogFactory) CreateAnimal() Animal {
    return Dog{}
}

// 猫工厂
type CatFactory struct{}

func (cf CatFactory) CreateAnimal() Animal {
    return Cat{}
}

func main() {
    // 创建狗
    dogFactory := DogFactory{}
    dog := dogFactory.CreateAnimal()
    fmt.Println(dog.Speak()) // 输出: Woof!

    // 创建猫
    catFactory := CatFactory{}
    cat := catFactory.CreateAnimal()
    fmt.Println(cat.Speak()) // 输出: Meow!
}
特点
  • 符合开闭原则:添加新产品时,无需修改现有代码,只需添加新的工厂和产品即可。
  • 职责单一:每个工厂只负责创建一种产品,代码清晰明了。
缺点
  • 类的数量增加:每添加一种产品,都需要增加相应的工厂类,可能导致类的数量过多。
  • 使用复杂度增加:客户端需要了解不同的工厂类,使用时需要实例化相应的工厂。

抽象工厂模式

抽象工厂模式用于创建一系列相关或相互依赖的对象,提供一个接口来创建一族相关的产品,而无需指定具体的产品类。

示例代码
package main

import "fmt"

// 动物接口
type Animal interface {
    Speak() string
}

// 栖息地接口
type Habitat interface {
    Describe() string
}

// 狗及其栖息地
type Dog struct{}

func (d Dog) Speak() string {
    return "Woof!"
}

type DogHouse struct{}

func (dh DogHouse) Describe() string {
    return "This is a dog house."
}

// 猫及其栖息地
type Cat struct{}

func (c Cat) Speak() string {
    return "Meow!"
}

type CatTree struct{}

func (ct CatTree) Describe() string {
    return "This is a cat tree."
}

// 抽象工厂接口
type AnimalFactory interface {
    CreateAnimal() Animal
    CreateHabitat() Habitat
}

// 狗工厂
type DogFactory struct{}

func (df DogFactory) CreateAnimal() Animal {
    return Dog{}
}

func (df DogFactory) CreateHabitat() Habitat {
    return DogHouse{}
}

// 猫工厂
type CatFactory struct{}

func (cf CatFactory) CreateAnimal() Animal {
    return Cat{}
}

func (cf CatFactory) CreateHabitat() Habitat {
    return CatTree{}
}

func main() {
    // 狗的产品族
    dogFactory := DogFactory{}
    dog := dogFactory.CreateAnimal()
    dogHouse := dogFactory.CreateHabitat()
    fmt.Println(dog.Speak())         // 输出: Woof!
    fmt.Println(dogHouse.Describe()) // 输出: This is a dog house.

    // 猫的产品族
    catFactory := CatFactory{}
    cat := catFactory.CreateAnimal()
    catTree := catFactory.CreateHabitat()
    fmt.Println(cat.Speak())         // 输出: Meow!
    fmt.Println(catTree.Describe())  // 输出: This is a cat tree.
}
特点
  • 生成相关产品族:可以生成一系列相关的产品,确保产品之间的兼容性。
  • 符合开闭原则:添加新的产品族时,只需添加新的工厂和产品实现。
缺点
  • 难以支持新种类的产品:如果需要在产品族中增加新的产品,所有的工厂都需要修改,违反了开闭原则。
  • 复杂度较高:涉及多个接口和类,结构相对复杂。

在 GeeRPC 中的应用

在 GeeRPC 中,使用了类似简单工厂模式的思想,通过维护一个映射 NewCodecFuncMap,将 CodecType 与对应的构造函数关联起来。这样,当需要创建一个新的编解码器实例时,只需根据 CodecType 在映射中查找相应的构造函数即可。

代码示例
package codec

type NewCodecFunc func(io.ReadWriteCloser) Codec

type Type string

const (
    GobType  Type = "application/gob"
    JsonType Type = "application/json"
)

var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
    NewCodecFuncMap = make(map[Type]NewCodecFunc)
    NewCodecFuncMap[GobType] = NewGobCodec
    // 可以在此添加其他编解码器的构造函数
}
特点
  • 灵活性:可以方便地添加新的编解码器类型,而无需修改现有的代码逻辑。
  • 解耦性:客户端和服务器通过 CodecType 进行协商,创建对应的编解码器实例,解耦了具体的实现。

http://www.kler.cn/a/400133.html

相关文章:

  • VMware 中 虚拟机【Linux系统】固定 ip 访问
  • Ubuntu安装配置MySQL(远程登录)
  • 前端处理input框只能输入带小数点的数字
  • Sql进阶:字段中包含CSV,如何通过Sql解析CSV成多行多列?
  • 定时器简介
  • 2.STM32之通信接口《精讲》之USART通信
  • JAVA学习-练习试用Java实现“判断星期的英文缩写”
  • 汽车资讯新篇章:Spring Boot技术启航
  • 241118学习日志——[CSDIY] [ByteDance] 后端训练营 [06]
  • 除了电商平台,还有哪些网站适合进行数据爬取?
  • spring web项目中常用的注解
  • 语义通信论文略读(十四)线性编码和传输的优化+边缘服务器执行CV任务
  • C 语言 【单链表】
  • 探索DDCA:深入理解内存架构、子系统与内存控制器
  • Python设计模式详解之2 —— 工厂模式
  • 多模块集成swagger(knife4j-spring-boot-starter)
  • C++ ──── set和map的模拟实现
  • 探索IDE的无限可能:使用技巧与插件推荐
  • 【人工智能】生成对抗网络(GAN)原理与Python实现:从零构建图像生成模型
  • Spark RDD、DStream、DataFrame、DataSet 在窗口操作上的区别
  • 国内镜像android studio
  • 请描述一下JVM(Java虚拟机)的生命周期及其对应用程序性能的影响
  • 如何加速conda、docker资源下载速度
  • 开源模型应用落地-qwen模型小试-Qwen2.5-7B-Instruct-tool usage入门-Qwen-Agent深入学习(四)
  • sglang 部署Qwen2VL7B,大模型部署,速度测试,深度学习
  • YOLO v1目标检测