复杂gRPC之go调用go
1. 复杂的gRPC调用
我们使用了一个较为复杂的proto文件,这个文件的功能主要是用来定位的,详细内容可以看代码中的注解
syntax = "proto3";
//指定生成的所属的package,方便调用
option go_package = "./";
package routeguide;
// 由服务器导出的接口。
service RouteGuide {
// 一个简单的 RPC。
// 获取给定位置的地点。
// 如果在给定位置没有地点,则返回一个空名称的地点。
rpc GetFeature(Point) returns (Feature) {}
// 一个服务器到客户端的流式 RPC。
// 获取给定矩形内可用的地点。结果以流的方式提供,而不是一次性返回
// (例如,在具有重复字段的响应消息中),因为矩形可能覆盖一个大面积,并包含大量地点。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// 一个客户端到服务器的流式 RPC。
// 接受正在遍历的路线上的一系列点流,当遍历完成时返回一个 RouteSummary。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// 一个双向流式 RPC。
// 在遍历路线时接受一系列发送的 RouteNotes,同时接收其他 RouteNotes(例如来自其他用户)。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
// 点的表示采用纬度-经度对的 E7 表示法
// (度数乘以10的7次方并四舍五入到最近的整数)。
// 纬度应该在+/- 90度的范围内,经度应该在+/- 180度的范围内(包括边界)。
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
// 一个以两个对角线相对的点 "lo" 和 "hi" 表示的纬度-经度矩形。
message Rectangle {
// 矩形的一个角
Point lo = 1;
// 矩形的另一个角
Point hi = 2;
}
// 一个要在给定点命名的地点。
// 如果无法给地点命名,则名称为空。
message Feature {
// 地点的名称
string name = 1;
// 地点
Point location = 2;
}
// RouteNote 是在给定点发送的消息。
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
// 在响应 RecordRoute rpc 时收到 RouteSummary。
// 它包含接收到的个别点的数量,检测到的地点数量以及作为每个点之间距离的累积和的总距离。
message RouteSummary {
// 接收到点的数量
int32 point_count = 1;
// 通过遍历路线时经过的已知地点数量
int32 feature_count = 2;
// 以米为单位的距离。
int32 distance = 3;
// 遍历所用的时间,以秒为单位。
int32 elapsed_time = 4;
}
相比之前的文件来说,这个方法中定义了四种类型的方法。
● 简单的RPC接口
○ 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
● 一个服务器到客户端的流式RPC
○ 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。
● 一个客户端到服务端的流逝RPC
○ 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。
● 一个双向流式RPC
○ 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。
对proto文件进行编译
protoc --go_out=plugins=grpc:. RouteGuide.proto
2. 服务端代码
2.1导包
里面有一个需要我们进行实现的方法,可以在编译后的proto文件中找到。
package main
import (
pb "complex_go_server_grpc/proto"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math"
"net"
"os"
"sync"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
)
// type RouteGuideServer interface {
// // 一个简单的 RPC。
// // 获取给定位置的地点。
// // 如果在给定位置没有地点,则返回一个空名称的地点。
// GetFeature(context.Context, *Point) (*Feature, error)
// // 一个服务器到客户端的流式 RPC。
// // 获取给定矩形内可用的地点。结果以流的方式提供,而不是一次性返回
// // (例如,在具有重复字段的响应消息中),因为矩形可能覆盖一个大面积,并包含大量地点。
// ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error
// // 一个客户端到服务器的流式 RPC。
// // 接受正在遍历的路线上的一系列点流,当遍历完成时返回一个 RouteSummary。
// RecordRoute(RouteGuide_RecordRouteServer) error
// // 一个双向流式 RPC。
// // 在遍历路线时接受一系列发送的 RouteNotes,同时接收其他 RouteNotes(例如来自其他用户)。
// RouteChat(RouteGuide_RouteChatServer) error
// }
2.2普通调用
服务端代码:
// 查询某个点位是否是已知的地名
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
//不是已知的地名,返回一个没有命名的feature
return &pb.Feature{Location: point}, nil
}
客户端代码:
// 获取给定点的特征。
func printFeature(client pb.RouteGuideClient, point *pb.Point) {
// 打印日志,获取给定点的特征
log.Printf("获取点 (%d, %d) 的特征", point.Latitude, point.Longitude)
// 创建带有超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 调用 gRPC 客户端的 GetFeature 方法获取特征
feature, err := client.GetFeature(ctx, point)
if err != nil {
// 处理获取特征失败的情况
log.Fatalf("client.GetFeature 失败:%v", err)
}
// 打印获取到的特征
log.Println(feature)
}
main函数
func main() {
flag.Parse()
var opts []grpc.DialOption
if *tls {
if *caFile == "" {
//*caFile = data.Path("x509/ca_cert.pem")
}
creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
if err != nil {
log.Fatalf("Failed to create TLS credentials: %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
//建立连接
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
//根据连接创建客户端
client := pb.NewRouteGuideClient(conn)
//========利用客户端进行调用=========
// 查看一个有效的点
printFeature(client, &pb.Point{Latitude: 409146138, Longitude: -746188906})
// 查看一个无效的点
//printFeature(client, &pb.Point{Latitude: 0, Longitude: 0})
// Looking for features between 40, -75 and 42, -73.
// printFeatures(client, &pb.Rectangle{
// Lo: &pb.Point{Latitude: 400000000, Longitude: -750000000},
// Hi: &pb.Point{Latitude: 420000000, Longitude: -730000000},
// })
// RecordRoute
//runRecordRoute(client)
// RouteChat
//runRouteChat(client)
}
效果:
2.3 服务端到客户端流式输出
服务端代码:
// 一个服务器到客户端的流式 RPC。
// 判断服务器端端地点是否有在距形范围内的
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
客户端代码:
// 列出在给定边界矩形内的所有特征。
func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
log.Printf("Looking for features within %v", rect)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListFeatures(ctx, rect)
if err != nil {
log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("client.ListFeatures failed: %v", err)
}
log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}
}
2.4 客户端到服务端流式输入
服务端代码:
// 接收来自客户端的一系列点流,计算路线的总点数、特征点数、覆盖距离和总耗时,
// 然后通过流式响应将计算结果发送回客户端。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
// 循环接收来自客户端的点流
for {
// 接受点流中的点
point, err := stream.Recv()
// 如果点流结束,发送计算结果并关闭流
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
// 处理接收点流过程中的错误
if err != nil {
return err
}
// 增加总点数
pointCount++
// 遍历保存的特征点,如果点匹配,则增加特征点数
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
// 计算覆盖距离
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
// 更新上一个点
lastPoint = point
}
}
客户端代码:
func runRecordRoute(client pb.RouteGuideClient) {
// 随机生成一系列的点
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
//============开始发送点流==============
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.RecordRoute(ctx)
if err != nil {
log.Fatalf("client.RecordRoute failed: %v", err)
}
for _, point := range points {
//遍历点并一个一个发送过去
if err := stream.Send(point); err != nil {
log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
}
}
//等待结束并关闭
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)
}
效果:
2.5 双向流式RPC
服务端代码:
// RouteChat 接收一系列消息/位置对的流,并响应包含每个位置的所有先前消息的流。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
// 从流中接收消息/位置对
in, err := stream.Recv()
// 如果流结束,返回 nil 表示成功处理
if err == io.EOF {
return nil
}
// 处理接收流过程中的错误
if err != nil {
return err
}
// 根据位置序列化消息/位置对,用作路由记录的键
key := serialize(in.Location)
// 使用互斥锁以确保并发安全
s.mu.Lock()
// 将接收到的消息添加到路由记录中
s.routeNotes[key] = append(s.routeNotes[key], in)
// 注意:此处的复制防止在为此客户端服务时阻塞其他客户端。
// 我们不需要进行深度复制,因为切片中的元素是仅插入,永远不会修改的。
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
// 解锁互斥锁
s.mu.Unlock()
// 将之前一个地点的所有先前的消息发送回流
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
客户端代码:
// runRouteChat 接收一系列路由信息,同时为不同的位置发送信息。
func runRouteChat(client pb.RouteGuideClient) {
// 预定义一组路由信息
notes := []*pb.RouteNote{
{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
}
// 创建一个带有超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 使用 RouteChat 方法创建流式 RPC 客户端
stream, err := client.RouteChat(ctx)
if err != nil {
log.Fatalf("client.RouteChat 失败:%v", err)
}
// 创建一个等待信号的通道
waitc := make(chan struct{})
// 启动协程监听流式响应
go func() {
for {
in, err := stream.Recv()
// 如果流结束,关闭等待信号通道并返回
if err == io.EOF {
close(waitc)
return
}
// 处理接收流过程中的错误
if err != nil {
log.Fatalf("client.RouteChat 失败:%v", err)
}
// 打印接收到的消息和位置
log.Printf("收到消息 %s 在点(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
// 遍历预定义的注释并通过流发送
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("client.RouteChat: stream.Send(%v) 失败:%v", note, err)
}
}
// 关闭发送流
stream.CloseSend()
// 等待流结束的信号
<-waitc
}
效果:
3 源代码链接
https://gitee.com/guo-zonghao/complex_go_server_grpc