当前位置: 首页 > article >正文

【GeeRPC】Day5:支持 HTTP 协议

Day5:支持 HTTP 协议

今天要完成的任务如下:

  • 支持 HTTP 协议;
  • 基于 HTTP 实现一个简单的 Debug 页面,代码约 150 行;

支持 HTTP 协议需要什么?

Web 开发中,我们常使用 HTTP 协议中的 HEAD、GET、POST 等方式发送请求,等待响应。但 RPC 的格式与标准的 HTTP 协议并不兼容,在这种情况下,就需要一个协议的转换过程。HTTP 协议的 CONNECT 方法恰好提供了这个能力,CONNECT 一般用于代理服务。

假设浏览器和服务器之间的 HTTPS 通信是加密的,浏览器通过代理服务器发起 HTTPS 请求时,由于请求的站点地址和端口号都是加密保存在 HTTPS 请求报文头当中的,代理服务器如何知道往哪里发送请求呢?为了解决这个问题,浏览器通过 HTTP 明文形式向代理服务器发送一个 CONNECT 请求告诉代理服务器目的地址和端口,代理服务器收到请求之后,会在对应端口与目标站点建立一个 TCP 连接,连接建立成功后会返回 HTTP 200 状态码告诉浏览器与该站点的加密通道已经完成。接下来代理服务器仅需要透传浏览器和服务器之间的加密数据包即可,代理服务器无需解析 HTTPS 报文。

举一个简单的例子:
一. 浏览器向代理服务器发送 CONNECT 请求:

CONNECT geektutu.com:443 HTTP/1.0

二. 代理服务器返回 HTTP 200 状态码表示连接已建立:

HTTP/1.0 200 Connection Established

三. 之后浏览器和服务器开始 HTTPS 握手并交换加密数据,代理服务器只负责传输彼此的数据包,不能读取具体的数据内容(代理服务器也可以选择安装可信根证书解密 HTTPS 报文)。

事实上,上述过程其实是通过代理服务器将 HTTP 协议转为 HTTPS 协议的过程【浏览器(客户端)将 HTTPS 请求发送给代理服务器,再将目的服务端的 IP 地址和端口通过明文的 CONNECT 请求发送给代理服务器,代理服务器与服务端建立 TCP 连接之后,将客户端的 HTTPS 报文发送给服务端,代理服务器无需解析 HTTPS 报文】。对 RPC 服务而言,需要做的是将 HTTP 协议转为 RPC 协议,对客户端来说,需要新增通过 HTTP CONNECT 请求创建连接的逻辑

服务端支持 HTTP 协议

支持 HTTP 协议的服务端的通信应该是这样的:

一. 客户端向 RPC 服务器发送 CONNECT 请求:

CONNECT 10.0.0.1:9999/_geerpc_ HTTP/1.0

二. RPC 服务器返回 HTTP 200 状态码表示连接建立:

HTTP/1.0 200 Connected to Gee RPC

三. 客户端使用创建好的连接发送 RPC 报文,先发送 Option,再发送 N 个请求报文,服务端处理 RPC 请求并相应。

server.go中新增如下方法:

const (
	connected        = "200 Connected to Gee RPC"	// 当客户端成功连接到 RPC 服务器之后, 返回的 HTTP 响应消息
	defaultRPCPath   = "/_geerpc_"					// 默认的 RPC 请求路径
	defaultDebugPath = "/debug/geerpc"				// 默认的调试信息路径, 用于注册 HTTP 处理器
)

// ServeHTTP implements a http.Handler that answers RPC requests.
// ServeHTTP 方法实现了 http.Handler 接口, 用于处理 HTTP 请求
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "CONNECT" {
		// 首先检查请求方法是否为 CONNECT, 如果不是, 返回 405 Method Not Allowed 错误, 并提示客户端必须使用 CONNECT 方法
		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
		w.WriteHeader(http.StatusMethodNotAllowed)
		_, _ = io.WriteString(w, "405 must CONNECT\n")
		return
	}
		// 如果请求方法是 CONNECT, 则通过 http.Hijacker 接口劫持连接, 获取底层的 net.Conn 对象, 劫持连接后, 服务器可以直接
		// 控制底层的 TCP 连接, 而不需要 HTTP 协议进行通信, conn 应该就是劫持到的 TCP 连接
	conn, _, err := w.(http.Hijacker).Hijack()
	if err != nil {
		log.Print("rpc hijacking", req.RemoteAddr, ": ", err.Error())
		return
	}
		// 劫持连接后, 服务端向客户端发送一个简单的 HTTP 响应, 表示连接已建立
	_, _ = io.WriteString(conn, "HTTP/1.0"+connected+"\n\n")
		// 最后调用 server.ServeConn(conn) 方法, 开始处理 RPC 请求
	server.ServeConn(conn)
}

// HandleHTTP registers an HTTP handler for RPC messages on rpcPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
// HandleHTTP 方法用于处理 HTTP 处理器. 它将 defaultDebugPath 路径与 server 关联起来, 使得客户端访问该路径时, 会调用 server
// 的 ServeHTTP 方法
func (server *Server) HandleHTTP() {
	http.Handle(defaultDebugPath, server)
}

// HandleHTTP is a convenient approach for default server to register HTTP handlers
// HandleHTTP 是一个便捷函数, 用于在默认的 RPC 服务器上注册 HTTP 处理器. 它调用 DefaultServer 的 HandleHTTP 方法
// DefaultServer 是一个全局的 Server 实例.
func HandleHTTP() {
	DefaultServer.HandleHTTP()
}

defaultDebugpath 是为后续 DEBUG 页面预留的地址。

在 Golang 中处理 HTTP 请求是一件非常简单的事情,Go 标准库中的 http.Handle 的实现如下:

package http

func Handle(pattern string, handler Handler) { DefaultServeMux.Handle(pattern, handler) }

第一个参数是支持通配的字符串 pattern,在这里,我们固定传入 /_geerpc_,第二个参数是 Handler 类型,Handler 是一个接口类型,定义如下:

type Handler interface {
	ServeHTTP(w ResponseWriter, r *Request)
}

也就是说,只需要实现接口 Handler 即可作为一个 HTTP Handler 处理 HTTP 请求。接口 Handler 只定义了一个方法 ServeHTTP,实现该方法即可。

客户端支持 HTTP 协议

服务端已经能够接受 CONNECT 请求,并返回了 200 状态码 HTTP/1.0 200 Connected to Gee RPC,客户端要做的就是发起 CONNECT 请求,检查返回状态码即可成功建立连接。

// NewHTTPClient 函数用于创建一个基于 HTTP 的 RPC 客户端. 它通过已经建立的 TCP 连接 (conn) 与服务器进行 HTTP 协议
// 的握手, 并在握手成功后切换到 RPC 协议
func NewHTTPClient(conn net.Conn, opt *Option) (*Client, error) {
	// 通过 io.WriteString 向服务器发送一个 HTTP CONNECT 请求, 请求路径为 defaultRPCPath
	// 请求格式: CONNECT /_geerpc_ HTTP/1.0
	// 这个请求是告诉服务器, 客户端希望通过 HTTP 协议建立连接, 并切换到 RPC 协议
	_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", defaultRPCPath))

	// Require successful HTTP response
	// before switching to RPC protocal.
	/*
		使用 http.ReadResponse 从连接中读取服务器的 HTTP 响应.
		bufio.NewReader(conn) 将 conn 包装成一个带缓冲的读取器, 以便逐行读取 HTTP 响应
		http.Request{Method: "CONNECT"} 是一个虚拟的 HTTP 请求对象, 用于告诉 http.ReadResponse 这是一个 CONNECT 请求的响应
	*/
	resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
	if err == nil && resp.Status == connected {
		// 连接成功, 此时调用 NewClient(conn, opt) 新建一个 TPC 客户端并返回
		return NewClient(conn, opt)
	}
	if err == nil {
		err = errors.New("unexpected HTTP response: " + resp.Status)
	}
	return nil, err
}

// DialHTTP 是一个便捷函数, 用于通过 HTTP 协议连接到 RPC 服务器, 它封装了底层的连接建立和协议切换逻辑
func DialHTTP(network, address string, opts ...*Option) (*Client, error) {
	return dialTimeout(NewHTTPClient, network, address, opts...)
}

通过 HTTP CONNECT 请求建立连接之后,后续的通信过程交给 NewClient。

为了简化调用,提供了一个统一入口XDial

// XDial calls different functions to connect to a RPC server
// according the first parameter rpcAddr.
// rpcAddr is a general format (protocol@addr) to represent a rpc server
// eg, http@10.0.0.1:7001, tcp@10.0.0.1:9999, unix@/tmp/geerpc.sock
func XDial(rpcAddr string, opts ...*Option) (*Client, error) {
	// XDial 是一个通用的 RPC 客户端连接函数, 支持多种协议. 它的主要功能是根据 rpcAddr 的格式解析协议和地址, 
	// 并调用相应的底层连接函数来建立与 RPC 服务器的连接.
	parts := strings.Split(rpcAddr, "@")
	if len(parts) != 2 {
		return nil, fmt.Errorf("rpc client err: wrong format '%s', expected protocol@addr", rpcAddr)
	}
	protocol, addr := parts[0], parts[1]
	switch protocol {
	case "http":
		return DialHTTP("tcp", addr, opts...)
	default:
		return Dial(protocol, addr, opts...)
	}
}

添加一个测试用例试一试,这个测试用例使用了 unix 协议创建 socket 连接,适用于本机内部的通信,使用上与 TCP 协议并无区别:

func TestXDial(t *testing.T) {
	if runtime.GOOS == "linux" {
		ch := make(chan struct{})
		addr := "/tmp/geerpc.sock"
		go func() {
			_ = os.Remove(addr)
			l, err := net.Listen("unix", addr)
			if err != nil {
				t.Fatal("failed to listen unix socket")
			}
			ch <- struct{}{}
			Accept(l)
		}()
		<-ch
		_, err := XDial("unix@" + addr)
		_assert(err == nil, "failed to connect unix socket")
	}
}

实现一个简单的 DEBUG 页面

支持 HTTP 协议的好处在于,RPC 服务仅仅使用了监听端口的 /_geerpc 路径,在其他路径上我们可以提供诸如日志、统计等更为丰富的功能。接下来我们在 /debug/geerpc 上展示服务的调用统计视图。

// in geerpc/debug.go

package geerpc

import (
	"fmt"
	"html/template"
	"net/http"
)

const debugText = `<html>
	<body>
	<title>GeeRPC Services</title>
	{{range .}}
	<hr>
	Service {{.Name}}
	<hr>
		<table>
		<th align=center>Method</th><th align=center>Calls</th>
		{{range $name, $mtype := .Method}}
			<tr>
			<td align=left font=fixed>{{$name}}({{$mtype.ArgType}}, {{$mtype.ReplyType}}) error</td>
			<td align=center>{{$mtype.NumCalls}}</td>
			</tr>
		{{end}}
		</table>
	{{end}}
	</body>
	</html>`

var debug = template.Must(template.New("RPC debug").Parse(debugText))

type debugHTTP struct {
	*Server
}

type debugService struct {
	Name   string
	Method map[string]*methodType
}

// Runs at /debug/geerpc
func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	// Build a sorted version of the data.
	var services []debugService
	server.serviceMap.Range(func(namei, svci interface{}) bool {
		svc := svci.(*service)
		services = append(services, debugService{
			Name:   namei.(string),
			Method: svc.method,
		})
		return true
	})
	err := debug.Execute(w, services)
	if err != nil {
		_, _ = fmt.Fprintln(w, "rpc: error executing template:", err.Error())
	}
}

此处将返回一个 HTML 报文,这个报文将展示注册所有的 service 的每一个方法的调用情况。

把 debugHTTP 实例绑定到地址 /debug/geerpc

// in geerpc/server.go
func (server *Server) HandleHTTP() {
	http.Handle(defaultRPCPath, server)
	http.Handle(defaultDebugPath, debugHTTP{server})
	log.Println("rpc server debug path:", defaultDebugPath)
}

Demo

// in geerpc/main/main.go
package main

import (
	"Geektutu/GeeRPC/geerpc"
	"context"
	"log"
	"net"
	"net/http"
	"sync"
	"time"
)

type Foo int

type Args struct{ Num1, Num2 int }

func (f Foo) Sum(args Args, reply *int) error {
	*reply = args.Num1 + args.Num2
	return nil
}

func startServer(addrCh chan string) {
	var foo Foo
	l, _ := net.Listen("tcp", ":8080")
	_ = geerpc.Register(&foo)
	geerpc.HandleHTTP()
	addrCh <- l.Addr().String()
	_ = http.Serve(l, nil)
}

func call(addrCh chan string) {
	client, _ := geerpc.DialHTTP("tcp", <-addrCh)
	defer func() { _ = client.Close() }()

	time.Sleep(time.Second)
	// send request & receive response
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			args := &Args{Num1: i, Num2: i * i}
			var reply int
			if err := client.Call(context.Background(), "Foo.Sum", args, &reply); err != nil {
				log.Fatal("call Foo.Sum error:", err)
			}
			log.Printf("%d + %d = %d", args.Num1, args.Num2, reply)
		}(i)
	}
	wg.Wait()
}

func main() {
	log.SetFlags(0)
	ch := make(chan string)
	go call(ch)
	startServer(ch)
}

结果:

rpc server: register Foo.Sum
rpc server debug path: /debug/geerpc
3 + 9 = 12
0 + 0 = 0
1 + 1 = 2
2 + 4 = 6
4 + 16 = 20

在浏览器当中可视化:
在这里插入图片描述

注意事项

需要注意的是,server.go 当中有两个函数需要修改:

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	defer func() { _ = conn.Close() }()
	var opt Option
	if err := json.NewDecoder(conn).Decode(&opt); err != nil {
		log.Println("rpc server: options error: ", err)
		return
	}
	if opt.MagicNumber != MagicNumber {
		log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
		return
	}
	f := codec.NewCodecFuncMap[opt.CodecType]
	if f == nil {
		log.Printf("rpc server: invalid codec type %s", opt.CodecType)
		return
	}
	server.serveCodec(f(conn), &opt)
}

func (server *Server) serveCodec(cc codec.Codec, opt *Option) {
	sending := new(sync.Mutex) // make sure to send a complete response
	wg := new(sync.WaitGroup)  // wait until all request are handled
	for {
		req, err := server.readRequest(cc)
		if err != nil {
			if req == nil {
				break // it's not possible to recover, so close the connection
			}
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			continue
		}
		wg.Add(1)
		go server.handleRequest(cc, req, sending, wg, opt.HandleTimeout)
	}
	wg.Wait()
	_ = cc.Close()
}

这样才能跑通 main。


http://www.kler.cn/a/542967.html

相关文章:

  • qml ToolBar详解
  • xtuner微调internlm2-chat-1_8b--xtuner中文文档快速上手案例
  • 网络工程师 (30)以太网技术
  • C++ 模板
  • 跨平台开发利器:UniApp 全面解析与实践指南
  • python-leetcode 25.环形链表
  • 浙江大华社招面试
  • PyTorch 中 `torch.cuda.amp` 相关警告的解决方法
  • 手撕Transformer编码器:从Self-Attention到Positional Encoding的PyTorch逐行实现
  • MySQL主从复制过程,延迟高,解决应对策略
  • MS08067练武场--WP
  • IntelliJ IDEA Console控制台输出成json的配置方式
  • 4、k8s的pod详解
  • 公开免费的API集合
  • 嵌入式音视频开发(零)移植ffmpeg及推流测试
  • Spring Boot 配置 Mybatis 读写分离
  • 【机器学习案列】车辆二氧化碳排放量预测
  • Redis哨兵模式相关问题及解决方案
  • <tauri><rust><GUI>基于rust和tauri的图片显示程序(本地图片的加载、显示、保存)
  • Qt QOpenGLFunctions详解
  • AF3 drmsd函数解读
  • .Net使用EF Core框架如何连接Oracle
  • JVM-Java虚拟机
  • 在postman中设置环境变量和全局变量以及五大常用响应体断言
  • 【C#零基础从入门到精通】(十四)——面向对象三大特征C#封装详解
  • 二叉树、平衡二叉树、B树与B+树的区别与应用