当前位置: 首页 > article >正文

go GRPC学习笔记

本博文源于笔者正在学习的gprc,相关配套书籍素材来源是《Go编程进阶实战》,博文内容主要包含了RPC模式讲解,RPC通过htttp访问、拦截器、提高服务端与客户端容错的内容配置
在此之前需要下载protoc,这里不做下载过程

1、RPC模式

首先定义在rpc模式里共有四种

  • Unary RPC 单个请求–单个响应
  • Server Streaming RPC 多个请求–单个响应
  • Client Streaming RPC 单个请求–多个响应
  • Bidriectional Streaming RPC 多个请求–多个响应

1.1 举例:编写

文件夹目录
在这里插入图片描述

编写protoc文件

syntax = "proto3"; // 声明编译器用的是prototype3


package person;

option go_package="/projectRPCTest3/pb/person;person";


message PersonReq{
    string name =1;
    int32 age = 2;
}

message PersonRes{
    string name = 1;
    int32 age = 2;
}



service SearchService {
    rpc Search(PersonReq) returns (PersonRes);
    rpc SearchIn(stream PersonReq) returns(PersonRes);
    rpc SearchOut(PersonReq) returns (stream PersonRes);
    rpc SearchIO(stream PersonReq) returns (stream PersonRes);
}

编译出protoc的两个go文件,在person目录下进行编译
在这里插入图片描述

cd .\pb
cd .\person\
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\person.proto

main.go文件进行编写:

  • Search将收到的请求进行返回
  • SearchIn 多个请求过来统一做回复
  • SearchOut 单个请求过来,多次响应
  • SearchIO 多个请求过来,多个响应
package main

import (
	"context"
	"fmt"
	"time"

	"net"
	"projectRPCTest3/pb/person"

	"google.golang.org/grpc"
)

type personServe struct {
	person.UnimplementedSearchServiceServer
}

func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
	name := req.GetName()
	res := &person.PersonRes{Name: "我收到了" + name + "的信息"}
	return res, nil
}
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {
	for {
		req, err := server.Recv()
		fmt.Println(req)
		if err != nil {
			server.SendAndClose(&person.PersonRes{Name: "success"})
			break
		}
	}
	return nil
}

func (*personServe) SearchOut(req *person.PersonReq, server grpc.ServerStreamingServer[person.PersonRes]) error {
	name := req.Name
	i := 0
	for {
		if i > 10 {
			return nil
		}
		time.Sleep(1 * time.Second)
		server.Send(&person.PersonRes{Name: "I got it" + name})
		i++
	}

}
func (*personServe) SearchIO(server grpc.BidiStreamingServer[person.PersonReq, person.PersonRes]) error {
	str := make(chan string)
	go func() {
		for {
			req, err := server.Recv()
			if err != nil {
				str <- "Result"
				break
			}
			str <- req.Name
		}
	}()
	for {
		s := <-str
		if s == "Result" {
			break
		}
		server.Send(&person.PersonRes{Name: "I got it" + s})
	}
	return nil
}

func main() {
	l, _ := net.Listen("tcp", ":8888")
	s := grpc.NewServer()
	person.RegisterSearchServiceServer(s, &personServe{})

	s.Serve(l)
}

然后使用客户端client对server发送消息,进行测试,分为四个部分,最后给出完整的client.go
search

wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
	search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})
	if err != nil {
		fmt.Println(err)
	}
	wg.Done()
	fmt.Println(search.GetName())
}
wg.Wait()

在这里插入图片描述

searchIn

c, _ := client.SearchIn(context.Background())
i := 0
for {
	if i > 10 {
		res, _ := c.CloseAndRecv()
		fmt.Println(res)
		break

	}
	time.Sleep(1 * time.Second)
	c.Send(&person.PersonReq{Name: "client send message...."})
	i++

}

在这里插入图片描述

searchOut

c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})
for {
	req, err := c.Recv()
	if err != nil {
		fmt.Println(err)
		break
	}
	fmt.Println(req)
}

在这里插入图片描述

searchIO

c, _ := client.SearchIO(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
	for {
		err := c.Send(&person.PersonReq{Name: " hello"})
		time.Sleep(2 * time.Second)
		if err != nil {
			wg.Done()
			break
		}
	}
}()
go func() {
	for {
		req, err := c.Recv()
		fmt.Println(req)
		if err != nil {
			fmt.Println(err)
			wg.Done()
			break
		}
	}
}()
wg.Wait()

在这里插入图片描述
完整的client.go

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"projectRPCTest3/pb/person"
	"sync"
	"time"
)

func main() {
	l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
	client := person.NewSearchServiceClient(l)
	//wg := sync.WaitGroup{}
	//wg.Add(10)
	//for i := 0; i < 10; i++ {
	//	search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})
	//	if err != nil {
	//		fmt.Println(err)
	//	}
	//	wg.Done()
	//	fmt.Println(search.GetName())
	//}
	//wg.Wait()

	//c, _ := client.SearchIn(context.Background())
	//i := 0
	//for {
	//	if i > 10 {
	//		res, _ := c.CloseAndRecv()
	//		fmt.Println(res)
	//		break
	//
	//	}
	//	time.Sleep(1 * time.Second)
	//	c.Send(&person.PersonReq{Name: "client send message...."})
	//	i++
	//
	//}

	//c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})
	//for {
	//	req, err := c.Recv()
	//	if err != nil {
	//		fmt.Println(err)
	//		break
	//	}
	//	fmt.Println(req)
	//}

	c, _ := client.SearchIO(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		for {
			err := c.Send(&person.PersonReq{Name: " hello"})
			time.Sleep(2 * time.Second)
			if err != nil {
				wg.Done()
				break
			}
		}
	}()
	go func() {
		for {
			req, err := c.Recv()
			fmt.Println(req)
			if err != nil {
				fmt.Println(err)
				wg.Done()
				break
			}
		}
	}()
	wg.Wait()
}

1.2 任意字节传输

proto文件定义,文件名称为repositories.proto

syntax = "proto3";

option go_package = "projectBindataClientStreaming/service";


service Repo {
  rpc CreateRepo(stream RepoCreateRequest) returns(RepoCreateReply) {}
}

message RepoCreateRequest {
  oneof body {
    RepoContext context = 1;
    bytes data = 2;
  }
}

message RepoContext {
  string creator_id = 1;
  string name =2;
}

message Repository {
  string id = 1;
  string name = 2;
  string url = 3;
}

message RepoCreateReply {
  Repository repo = 1;
  int32 size =2;
}

进行编译proto

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\repositories.proto

server.go进行实现CreateRepo,然后注册服务,开启tcp,

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"io"
	"log"
	"net"
	"os"
	svc "projectBindataClientStreaming/service"
)

type repoService struct {
	svc.UnimplementedRepoServer
}

func (s *repoService) CreateRepo(
	stream svc.Repo_CreateRepoServer,
) error {
	var repoContext *svc.RepoContext
	var data []byte
	for {
		r, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return status.Error(
				codes.Unknown,
				err.Error(),
			)

		}
		switch t := r.Body.(type) {
		case *svc.RepoCreateRequest_Context:
			repoContext = r.GetContext()
		case *svc.RepoCreateRequest_Data:
			b := r.GetData()
			data = append(data, b...)
		case nil:
			return status.Error(
				codes.InvalidArgument,
				"Message doesn't contain context or data",
			)
		default:
			return status.Errorf(
				codes.FailedPrecondition,
				"Unexpected message type: %s",
				t,
			)
		}
	}
	repo := svc.Repository{
		Name: repoContext.Name,
		Url: fmt.Sprintf(
			"https://git.example.com/%s/%s",
			repoContext.CreatorId,
			repoContext.Name,
		),
	}
	r := svc.RepoCreateReply{
		Repo: &repo,
		Size: int32(len(data)),
	}
	return stream.SendAndClose(&r)
}

func registerServices(s *grpc.Server) {
	svc.RegisterRepoServer(s, &repoService{})
}

func startServer(s *grpc.Server, l net.Listener) error {
	return s.Serve(l)
}

func main() {
	listenAddr := ":50051"
	
	lis, err := net.Listen("tcp", listenAddr)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	registerServices(s)
	log.Fatal(startServer(s, lis))
}

测试server_test.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/test/bufconn"
	"io"
	"net"
	svc "projectBindataClientStreaming/service"
	"strings"
	"testing"
)

func startTestGrpcServer() *bufconn.Listener {
	l := bufconn.Listen(1)
	s := grpc.NewServer()
	registerServices(s)
	go func() {
		startServer(s, l)
	}()
	return l
}

func TestCreateRepo(t *testing.T) {
	l := startTestGrpcServer()
	bufconnDialer := func(ctx context.Context, address string) (net.Conn, error) {
		return l.Dial()
	}
	client, err := grpc.DialContext(context.Background(),
		"", grpc.WithContextDialer(bufconnDialer), grpc.WithInsecure())
	if err != nil {
		t.Fatalf("could not dial bufconn: %v", err)
	}
	repoClient := svc.NewRepoClient(client)
	stream, err := repoClient.CreateRepo(
		context.Background(),
	)
	if err != nil {
		t.Fatal("CreateRepo", err)
	}
	c := svc.RepoCreateRequest_Context{
		Context: &svc.RepoContext{
			CreatorId: "user-123",
			Name:      "test-repo",
		},
	}
	r := svc.RepoCreateRequest{
		Body: &c,
	}
	err = stream.Send(&r)
	if err != nil {
		t.Fatal("stream.Send", err)
	}
	data := "Arbitrary Data Bytes"
	repoData := strings.NewReader(data)
	for {
		b, err := repoData.ReadByte()
		if err == io.EOF {
			break
		}
		bData := svc.RepoCreateRequest_Data{
			Data: []byte{b},
		}
		r := svc.RepoCreateRequest{
			Body: &bData,
		}
		err = stream.Send(&r)
		if err != nil {
			t.Fatal("stream.Send", err)
		}
		l.Close()
	}
	resp, err := stream.CloseAndRecv()
	if err != nil {
		t.Fatal("CloseAndRecv", err)
	}
	expectedSize := int32(len(data))

	if resp.Size != expectedSize {
		t.Errorf("resp.Size != expectedSize, resp: %v", resp.Size)
	}
	expectedRepoUrl := "https://git.example.com/user-123/test-repo"
	if resp.Repo.Url != expectedRepoUrl {
		t.Errorf("resp.Repo.Url != expectedRepoUrl, resp: %v,got:%s", resp.Repo.Url,
			expectedRepoUrl)
	}
}

2、grpc通过http访问

上一小节通过tcp,这次通过http访问

2.1 案例,restful访问请求

proto文件编写,下面代码中的注释部分是用来测试,可复制过来一起测试。

syntax = "proto3";

package person;

option go_package="projectRPCTest4/pb/person;person";

import "google/api/annotations.proto";
message Mbody {
  string name = 1;
}
message PersonReq {
  string name = 1;
  int32 age = 2;
  Mbody body =3;
}

message PersonRes {
  string name = 1;
  int32 age = 2;
  Mbody body = 3;
}

service SearchService {
  rpc Search(PersonReq) returns (PersonRes){
    option(google.api.http)= {
     // post:"/api/person",
      // get:"/api/person",
      // get:"/api/person/{name}/{age}/{body.name}",
       get:"/api/person/{name=qm}/{age}/{body.name}",//只能固定
      //body:"body"
    };
  };
}

其中文件夹目录,与其他文件一起附上

在这里插入图片描述
annotations.proto

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package google.api;

import "google/api/http.proto";
import "google/protobuf/descriptor.proto";

option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "AnnotationsProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";

extend google.protobuf.MethodOptions {
  // See `HttpRule`.
  HttpRule http = 72295728;
}

http.proto

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package google.api;

option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "HttpProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";

// Defines the HTTP configuration for an API service. It contains a list of
// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
// to one or more HTTP REST API methods.
message Http {
  // A list of HTTP configuration rules that apply to individual API methods.
  //
  // **NOTE:** All service configuration rules follow "last one wins" order.
  repeated HttpRule rules = 1;

  // When set to true, URL path parameters will be fully URI-decoded except in
  // cases of single segment matches in reserved expansion, where "%2F" will be
  // left encoded.
  //
  // The default behavior is to not decode RFC 6570 reserved characters in multi
  // segment matches.
  bool fully_decode_reserved_expansion = 2;
}

// gRPC Transcoding
//
// gRPC Transcoding is a feature for mapping between a gRPC method and one or
// more HTTP REST endpoints. It allows developers to build a single API service
// that supports both gRPC APIs and REST APIs. Many systems, including [Google
// APIs](https://github.com/googleapis/googleapis),
// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
// and use it for large scale production services.
//
// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
// how different portions of the gRPC request message are mapped to the URL
// path, URL query parameters, and HTTP request body. It also controls how the
// gRPC response message is mapped to the HTTP response body. `HttpRule` is
// typically specified as an `google.api.http` annotation on the gRPC method.
//
// Each mapping specifies a URL path template and an HTTP method. The path
// template may refer to one or more fields in the gRPC request message, as long
// as each field is a non-repeated field with a primitive (non-message) type.
// The path template controls how fields of the request message are mapped to
// the URL path.
//
// Example:
//
//     service Messaging {
//       rpc GetMessage(GetMessageRequest) returns (Message) {
//         option (google.api.http) = {
//             get: "/v1/{name=messages/*}"
//         };
//       }
//     }
//     message GetMessageRequest {
//       string name = 1; // Mapped to URL path.
//     }
//     message Message {
//       string text = 1; // The resource content.
//     }
//
// This enables an HTTP REST to gRPC mapping as below:
//
// - HTTP: `GET /v1/messages/123456`
// - gRPC: `GetMessage(name: "messages/123456")`
//
// Any fields in the request message which are not bound by the path template
// automatically become HTTP query parameters if there is no HTTP request body.
// For example:
//
//     service Messaging {
//       rpc GetMessage(GetMessageRequest) returns (Message) {
//         option (google.api.http) = {
//             get:"/v1/messages/{message_id}"
//         };
//       }
//     }
//     message GetMessageRequest {
//       message SubMessage {
//         string subfield = 1;
//       }
//       string message_id = 1; // Mapped to URL path.
//       int64 revision = 2;    // Mapped to URL query parameter `revision`.
//       SubMessage sub = 3;    // Mapped to URL query parameter `sub.subfield`.
//     }
//
// This enables a HTTP JSON to RPC mapping as below:
//
// - HTTP: `GET /v1/messages/123456?revision=2&sub.subfield=foo`
// - gRPC: `GetMessage(message_id: "123456" revision: 2 sub:
// SubMessage(subfield: "foo"))`
//
// Note that fields which are mapped to URL query parameters must have a
// primitive type or a repeated primitive type or a non-repeated message type.
// In the case of a repeated type, the parameter can be repeated in the URL
// as `...?param=A&param=B`. In the case of a message type, each field of the
// message is mapped to a separate parameter, such as
// `...?foo.a=A&foo.b=B&foo.c=C`.
//
// For HTTP methods that allow a request body, the `body` field
// specifies the mapping. Consider a REST update method on the
// message resource collection:
//
//     service Messaging {
//       rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
//         option (google.api.http) = {
//           patch: "/v1/messages/{message_id}"
//           body: "message"
//         };
//       }
//     }
//     message UpdateMessageRequest {
//       string message_id = 1; // mapped to the URL
//       Message message = 2;   // mapped to the body
//     }
//
// The following HTTP JSON to RPC mapping is enabled, where the
// representation of the JSON in the request body is determined by
// protos JSON encoding:
//
// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }`
// - gRPC: `UpdateMessage(message_id: "123456" message { text: "Hi!" })`
//
// The special name `*` can be used in the body mapping to define that
// every field not bound by the path template should be mapped to the
// request body.  This enables the following alternative definition of
// the update method:
//
//     service Messaging {
//       rpc UpdateMessage(Message) returns (Message) {
//         option (google.api.http) = {
//           patch: "/v1/messages/{message_id}"
//           body: "*"
//         };
//       }
//     }
//     message Message {
//       string message_id = 1;
//       string text = 2;
//     }
//
//
// The following HTTP JSON to RPC mapping is enabled:
//
// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }`
// - gRPC: `UpdateMessage(message_id: "123456" text: "Hi!")`
//
// Note that when using `*` in the body mapping, it is not possible to
// have HTTP parameters, as all fields not bound by the path end in
// the body. This makes this option more rarely used in practice when
// defining REST APIs. The common usage of `*` is in custom methods
// which don't use the URL at all for transferring data.
//
// It is possible to define multiple HTTP methods for one RPC by using
// the `additional_bindings` option. Example:
//
//     service Messaging {
//       rpc GetMessage(GetMessageRequest) returns (Message) {
//         option (google.api.http) = {
//           get: "/v1/messages/{message_id}"
//           additional_bindings {
//             get: "/v1/users/{user_id}/messages/{message_id}"
//           }
//         };
//       }
//     }
//     message GetMessageRequest {
//       string message_id = 1;
//       string user_id = 2;
//     }
//
// This enables the following two alternative HTTP JSON to RPC mappings:
//
// - HTTP: `GET /v1/messages/123456`
// - gRPC: `GetMessage(message_id: "123456")`
//
// - HTTP: `GET /v1/users/me/messages/123456`
// - gRPC: `GetMessage(user_id: "me" message_id: "123456")`
//
// Rules for HTTP mapping
//
// 1. Leaf request fields (recursive expansion nested messages in the request
//    message) are classified into three categories:
//    - Fields referred by the path template. They are passed via the URL path.
//    - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They
//    are passed via the HTTP
//      request body.
//    - All other fields are passed via the URL query parameters, and the
//      parameter name is the field path in the request message. A repeated
//      field can be represented as multiple query parameters under the same
//      name.
//  2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL
//  query parameter, all fields
//     are passed via URL path and HTTP request body.
//  3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP
//  request body, all
//     fields are passed via URL path and URL query parameters.
//
// Path template syntax
//
//     Template = "/" Segments [ Verb ] ;
//     Segments = Segment { "/" Segment } ;
//     Segment  = "*" | "**" | LITERAL | Variable ;
//     Variable = "{" FieldPath [ "=" Segments ] "}" ;
//     FieldPath = IDENT { "." IDENT } ;
//     Verb     = ":" LITERAL ;
//
// The syntax `*` matches a single URL path segment. The syntax `**` matches
// zero or more URL path segments, which must be the last part of the URL path
// except the `Verb`.
//
// The syntax `Variable` matches part of the URL path as specified by its
// template. A variable template must not contain other variables. If a variable
// matches a single path segment, its template may be omitted, e.g. `{var}`
// is equivalent to `{var=*}`.
//
// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
// contains any reserved character, such characters should be percent-encoded
// before the matching.
//
// If a variable contains exactly one path segment, such as `"{var}"` or
// `"{var=*}"`, when such a variable is expanded into a URL path on the client
// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
// server side does the reverse decoding. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{var}`.
//
// If a variable contains multiple path segments, such as `"{var=foo/*}"`
// or `"{var=**}"`, when such a variable is expanded into a URL path on the
// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
// The server side does the reverse decoding, except "%2F" and "%2f" are left
// unchanged. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{+var}`.
//
// Using gRPC API Service Configuration
//
// gRPC API Service Configuration (service config) is a configuration language
// for configuring a gRPC service to become a user-facing product. The
// service config is simply the YAML representation of the `google.api.Service`
// proto message.
//
// As an alternative to annotating your proto file, you can configure gRPC
// transcoding in your service config YAML files. You do this by specifying a
// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
// effect as the proto annotation. This can be particularly useful if you
// have a proto that is reused in multiple services. Note that any transcoding
// specified in the service config will override any matching transcoding
// configuration in the proto.
//
// The following example selects a gRPC method and applies an `HttpRule` to it:
//
//     http:
//       rules:
//         - selector: example.v1.Messaging.GetMessage
//           get: /v1/messages/{message_id}/{sub.subfield}
//
// Special notes
//
// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
// proto to JSON conversion must follow the [proto3
// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
//
// While the single segment variable follows the semantics of
// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
// Expansion, the multi segment variable **does not** follow RFC 6570 Section
// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
// does not expand special characters like `?` and `#`, which would lead
// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
// for multi segment variables.
//
// The path variables **must not** refer to any repeated or mapped field,
// because client libraries are not capable of handling such variable expansion.
//
// The path variables **must not** capture the leading "/" character. The reason
// is that the most common use case "{var}" does not capture the leading "/"
// character. For consistency, all path variables must share the same behavior.
//
// Repeated message fields must not be mapped to URL query parameters, because
// no client library can support such complicated mapping.
//
// If an API needs to use a JSON array for request or response body, it can map
// the request or response body to a repeated field. However, some gRPC
// Transcoding implementations may not support this feature.
message HttpRule {
  // Selects a method to which this rule applies.
  //
  // Refer to [selector][google.api.DocumentationRule.selector] for syntax
  // details.
  string selector = 1;

  // Determines the URL pattern is matched by this rules. This pattern can be
  // used with any of the {get|put|post|delete|patch} methods. A custom method
  // can be defined using the 'custom' field.
  oneof pattern {
    // Maps to HTTP GET. Used for listing and getting information about
    // resources.
    string get = 2;

    // Maps to HTTP PUT. Used for replacing a resource.
    string put = 3;

    // Maps to HTTP POST. Used for creating a resource or performing an action.
    string post = 4;

    // Maps to HTTP DELETE. Used for deleting a resource.
    string delete = 5;

    // Maps to HTTP PATCH. Used for updating a resource.
    string patch = 6;

    // The custom pattern is used for specifying an HTTP method that is not
    // included in the `pattern` field, such as HEAD, or "*" to leave the
    // HTTP method unspecified for this rule. The wild-card rule is useful
    // for services that provide content to Web (HTML) clients.
    CustomHttpPattern custom = 8;
  }

  // The name of the request field whose value is mapped to the HTTP request
  // body, or `*` for mapping all request fields not captured by the path
  // pattern to the HTTP body, or omitted for not having any HTTP request body.
  //
  // NOTE: the referred field must be present at the top-level of the request
  // message type.
  string body = 7;

  // Optional. The name of the response field whose value is mapped to the HTTP
  // response body. When omitted, the entire response message will be used
  // as the HTTP response body.
  //
  // NOTE: The referred field must be present at the top-level of the response
  // message type.
  string response_body = 12;

  // Additional HTTP bindings for the selector. Nested bindings must
  // not contain an `additional_bindings` field themselves (that is,
  // the nesting may only be one level deep).
  repeated HttpRule additional_bindings = 11;
}

// A custom pattern is used for defining custom HTTP verb.
message CustomHttpPattern {
  // The name of this custom HTTP verb.
  string kind = 1;

  // The path matched by this custom verb.
  string path = 2;
}

proto文件进行编译

protoc -I ./pb --go_out ./pb --go_opt paths=source_relative --go-grpc_out ./pb --go-grpc_opt paths=source_relative --grpc-gateway_out ./pb --grpc-gateway_opt paths=source_relative  ./pb/person/person.proto

编写persos.proto中的Search,将接收的打印即可,后面在做注册

func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
	name := req.GetName()
	fmt.Println(name)
	fmt.Println(req.Name)
	fmt.Println(req.Age)
	fmt.Println(req.String())
	req.GetBody()
	res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}
	return res, nil
}

完整的service.go,运行main后,直接用postman进行访问,比较好

package main

import (
	"context"
	"fmt"
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	"net"
	"net/http"
	"projectRPCTest4/pb/person"
	"sync"

	"google.golang.org/grpc"
)

type personServe struct {
	person.UnimplementedSearchServiceServer
}

func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
	name := req.GetName()
	fmt.Println(name)
	fmt.Println(req.Name)
	fmt.Println(req.Age)
	fmt.Println(req.String())
	req.GetBody()
	res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}
	return res, nil
}

func main() {
	wg := sync.WaitGroup{}
	wg.Add(2)
	go registerGateway(&wg)
	go registerGRPC(&wg)
	//time.Sleep(100 * time.Second)
	wg.Wait()
}

func registerGateway(wg *sync.WaitGroup) {
	conn, _ := grpc.DialContext(context.Background(), "localhost:8888", grpc.WithBlock(), grpc.WithInsecure())

	mux := runtime.NewServeMux()
	gwServer := &http.Server{
		Handler: mux,
		Addr:    ":8090",
	}
	err := person.RegisterSearchServiceHandler(context.Background(), mux, conn)
	if err != nil {
		wg.Done()

		panic(err)
	}
	gwServer.ListenAndServe()
}

func registerGRPC(wg *sync.WaitGroup) {
	l, err := net.Listen("tcp", ":8888")

	if err != nil {
		wg.Done()

		panic(err)
	}
	s := grpc.NewServer()
	person.RegisterSearchServiceServer(s, &personServe{})

	s.Serve(l)
}

3、拦截器

拦截器的语法编写

syntax = "proto3";

option go_package = "projectRPCTest5-4/service/users";

service Users {
  rpc GetUser (UserGetRequest) returns (UserGetReply) {}
  rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}

message UserGetRequest {
  string email = 1;
  string id = 2;  
}

message User {
  string id = 1;
  string first_name = 2;
  string last_name = 3;
  int32 age = 4;
}

message UserGetReply {
  User user = 1;
}

message UserHelpRequest {
  User user = 1;
  string request = 2;
}

message UserHelpReply {
  string response = 1;
}

3.1 客户端拦截器

package main

import (
	"bufio"
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"io"
	"log"
	"os"
	svc "projectRPCTest5-4/service"
)

func metadataUnaryInterceptor(
	ctx context.Context,
	method string,
	req, reply interface{},
	cc *grpc.ClientConn,
	invoker grpc.UnaryInvoker,
	opts ...grpc.CallOption,
) error {
	ctxWithMetadata := metadata.AppendToOutgoingContext(
		ctx,
		"Request-Id",
		"request-123",
	)
	return invoker(ctxWithMetadata, method, req, reply, cc, opts...)
}

func metadataStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,
	opts ...grpc.CallOption) (grpc.ClientStream, error) {
	ctxWithMetadata := metadata.AppendToOutgoingContext(
		ctx,
		"Request-Id",
		"request-123",
	)
	clientStream, err := streamer(ctxWithMetadata, desc, cc, method, opts...)
	return clientStream, err
}

func setupGrpcConn(addr string) (*grpc.ClientConn, error) {
	return grpc.DialContext(context.Background(), addr,
		grpc.WithInsecure(), grpc.WithBlock(),
		grpc.WithUnaryInterceptor(metadataUnaryInterceptor),
		grpc.WithStreamInterceptor(metadataStreamInterceptor))
}

func getUserServiceClient(conn *grpc.ClientConn) svc.UsersClient {
	return svc.NewUsersClient(conn)
}

func getUser(client svc.UsersClient,
	u *svc.UserGetRequest) (*svc.UserGetReply, error) {
	return client.GetUser(context.Background(), u)
}

func setupChat(r io.Reader, w io.Writer, c svc.UsersClient) error {
	stream, err := c.GetHelp(context.Background())
	if err != nil {
		return err
	}
	for {
		scanner := bufio.NewScanner(r)
		prompt := "Request:"
		fmt.Fprint(w, prompt)
		scanner.Scan()
		if err := scanner.Err(); err != nil {
			return err
		}
		msg := scanner.Text()
		if msg == "quit" {
			break
		}
		request := svc.UserHelpRequest{
			Request: msg,
		}
		err := stream.Send(&request)
		if err != nil {
			return err
		}
		resp, err := stream.Recv()
		if err != nil {
			return err
		}
		fmt.Println("Recv", resp.Response)
	}
	return stream.CloseSend()
}

func main() {
	if len(os.Args) != 3 {
		log.Fatal("Specify a gRPC server and method to call")
	}
	serverAddrr := os.Args[1]
	methodName := os.Args[2]

	conn, err := setupGrpcConn(serverAddrr)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	c := getUserServiceClient(conn)

	switch methodName {
	case "GetUser":
		result, err := getUser(
			c,
			&svc.UserGetRequest{Email: "jane@doe.com"},
		)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Fprintf(
			os.Stdout, "User: %s %s\n",
			result.User.FirstName,
			result.User.LastName,
		)
	case "GetHelp":
		err = setupChat(os.Stdin, os.Stdout, c)
		if err != nil {
			log.Fatal(err)
		}
	default:
		log.Fatal("Unrecognized method name")
	}
}

3.2 服务端拦截器

拦截器这里是在每个请求加上日志请求

package main

//
import (
	//	"bufio"
	"context"
	"errors"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"io"
	"log"
	"net"
	"os"
	svc "projectRPCTest5-4/service"
	"strings"
	"time"
)

type userService struct {
	svc.UnimplementedUsersServer
}

func logMessage(ctx context.Context, method string, latency time.Duration, err error) {
	var requestId string
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		log.Println("md", md)
	} else {
		if len(md.Get("Request-Id")) != 0 {
			requestId = md.Get("Request-Id")[0]
		}
	}
	log.Printf("Method:%s,Error:%v,Latency:%v,Request-Id:%s", method, err, latency, requestId)

}

func loggingUnaryInterceptor(ctx context.Context, req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler) (interface{},
	error) {
	start := time.Now()
	resp, err := handler(ctx, req)
	logMessage(ctx, info.FullMethod, time.Since(start), err)
	return resp, err
}

func loggingStreamInterceptor(srv interface{}, stream grpc.ServerStream,
	info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	start := time.Now()
	err := handler(srv, stream)
	ctx := stream.Context()
	logMessage(ctx, info.FullMethod, time.Since(start), err)
	return err
}

func (s *userService) GetUser(
	ctx context.Context, in *svc.UserGetRequest) (*svc.UserGetReply, error) {
	log.Printf("Received request for user with Email:%s Id:%s\n",
		in.Email, in.Id)
	components := strings.Split(in.Email, "@")
	if len(components) != 2 {
		return nil, errors.New("invalid email")
	}
	u := svc.User{
		Id:        in.Id,
		FirstName: components[0],
		LastName:  components[1],
		Age:       36,
	}
	return &svc.UserGetReply{User: &u}, nil
}

func (s *userService) GetHelp(stream svc.Users_GetHelpServer) error {
	log.Println("Client connected")
	for {
		request, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		fmt.Printf("Request:%+v\n", request.Request)
		response := svc.UserHelpReply{
			Response: request.Request,
		}
		err = stream.Send(&response)
		if err != nil {
			return err
		}
	}
	log.Println("Client disconnected")
	return nil
}

func registerServices(s *grpc.Server) {
	svc.RegisterUsersServer(s, &userService{})
}

func startServer(s *grpc.Server, l net.Listener) error {
	return s.Serve(l)
}

func main() {
	listenAddr := os.Getenv("LISTEN_ADDR")
	if len(listenAddr) == 0 {
		listenAddr = ":50051"
	}
	lis, err := net.Listen("tcp", listenAddr)
	if err != nil {
		log.Fatal(err)
	}
	s := grpc.NewServer(
		grpc.UnaryInterceptor(loggingUnaryInterceptor),
		grpc.StreamInterceptor(loggingStreamInterceptor))
	registerServices(s)
	log.Fatal(startServer(s, lis))
}

3.4 连接拦截器

就是将众多拦截器集成在一个应用程序里

grpc.ChainUnaryInterceptor(
	metricUnaryInterceptro,
	loggingUnaryInterceptor,
)
grpc ChainStreamInterceptor(
	metricStreamInterceptro,
	loggingStreamInterceptor,	
)

4.服务端与客户端健壮性

4.1 服务端健康检查

syntax = "proto3";

option go_package = "projectRPCHealthCheck/service/users";

service Users {
  rpc GetUser (UserGetRequest) returns (UserGetReply) {}
  rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}

message UserGetRequest {
  string email = 1;
  string id = 2;  
}

message User {
  string id = 1;
  string first_name = 2;
  string last_name = 3;
  int32 age = 4;
}

message UserGetReply {
  User user = 1;
}

message UserHelpRequest {
  User user = 1;
  string request = 2;
}

message UserHelpReply {
  string response = 1;
}

服务端的go文件

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"strings"

	"google.golang.org/grpc"
	healthz "google.golang.org/grpc/health"
	healthsvc "google.golang.org/grpc/health/grpc_health_v1"
	svc "projectRPCHealthCheck/service"
)

type userService struct {
	svc.UnimplementedUsersServer
}

func (s *userService) GetUser(
	ctx context.Context,
	in *svc.UserGetRequest,
) (*svc.UserGetReply, error) {
	log.Printf(
		"Received request for user with Email: %s Id: %s\n",
		in.Email,
		in.Id,
	)
	components := strings.Split(in.Email, "@")
	if len(components) != 2 {
		return nil, errors.New("invalid email address")
	}
	u := svc.User{
		Id:        in.Id,
		FirstName: components[0],
		LastName:  components[1],
		Age:       36,
	}
	return &svc.UserGetReply{User: &u}, nil
}

func (s *userService) GetHelp(
	stream svc.Users_GetHelpServer,
) error {
	for {

		request, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		fmt.Printf("Request receieved: %s\n", request.Request)
		response := svc.UserHelpReply{
			Response: request.Request,
		}
		err = stream.Send(&response)
		if err != nil {
			return err
		}
	}
	return nil
}

func registerServices(s *grpc.Server, h *healthz.Server) {
	svc.RegisterUsersServer(s, &userService{})
	healthsvc.RegisterHealthServer(s, h)
}

func updateServiceHealth(
	h *healthz.Server,
	service string,
	status healthsvc.HealthCheckResponse_ServingStatus,
) {
	h.SetServingStatus(
		service,
		status,
	)
}

func startServer(s *grpc.Server, l net.Listener) error {
	return s.Serve(l)
}

func main() {
	listenAddr := os.Getenv("LISTEN_ADDR")
	if len(listenAddr) == 0 {
		listenAddr = ":50051"
	}

	lis, err := net.Listen("tcp", listenAddr)
	if err != nil {
		log.Fatal(err)
	}
	s := grpc.NewServer()
	h := healthz.NewServer()
	registerServices(s, h)
	updateServiceHealth(
		h,
		svc.Users_ServiceDesc.ServiceName,
		healthsvc.HealthCheckResponse_SERVING,
	)
	log.Fatal(startServer(s, lis))
}

对这段代码进行测试

package main

import (
	"context"
	"errors"
	healthz "google.golang.org/grpc/health"
	"log"
	"net"
	svc "projectRPCHealthCheck/service"
	users "projectRPCHealthCheck/service"
	"testing"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	healthsvc "google.golang.org/grpc/health/grpc_health_v1"
	"google.golang.org/grpc/status"
	"google.golang.org/grpc/test/bufconn"
)

var h *healthz.Server

func startTestGrpcServer() *bufconn.Listener {
	h = healthz.NewServer()
	l := bufconn.Listen(10)
	s := grpc.NewServer()
	registerServices(s, h)
	updateServiceHealth(
		h,
		svc.Users_ServiceDesc.ServiceName,
		healthsvc.HealthCheckResponse_SERVING,
	)
	go func() {
		log.Fatal(startServer(s, l))
	}()
	return l
}

func getHealthSvcClient(l *bufconn.Listener) (healthsvc.HealthClient, error) {

	bufconnDialer := func(
		ctx context.Context, addr string,
	) (net.Conn, error) {
		return l.Dial()
	}

	client, err := grpc.DialContext(
		context.Background(),
		"", grpc.WithInsecure(),
		grpc.WithContextDialer(bufconnDialer),
	)
	if err != nil {
		return nil, err
	}
	return healthsvc.NewHealthClient(client), nil
}

func TestHealthService(t *testing.T) {

	l := startTestGrpcServer()
	healthClient, err := getHealthSvcClient(l)
	if err != nil {
		t.Fatal(err)
	}

	resp, err := healthClient.Check(
		context.Background(),
		&healthsvc.HealthCheckRequest{},
	)
	if err != nil {
		t.Fatal(err)
	}
	serviceHealthStatus := resp.Status.String()
	if serviceHealthStatus != "SERVING" {
		t.Fatalf(
			"Expected health: SERVING, Got: %s",
			serviceHealthStatus,
		)
	}
}

func TestHealthServiceUsers(t *testing.T) {

	l := startTestGrpcServer()
	healthClient, err := getHealthSvcClient(l)
	if err != nil {
		t.Fatal(err)
	}

	resp, err := healthClient.Check(
		context.Background(),
		&healthsvc.HealthCheckRequest{
			Service: "Users",
		},
	)
	if err != nil {
		t.Fatal(err)
	}
	serviceHealthStatus := resp.Status.String()
	if serviceHealthStatus != "SERVING" {
		t.Fatalf(
			"Expected health: SERVING, Got: %s",
			serviceHealthStatus,
		)
	}
}

func TestHealthServiceUnknown(t *testing.T) {

	l := startTestGrpcServer()
	healthClient, err := getHealthSvcClient(l)
	if err != nil {
		t.Fatal(err)
	}

	_, err = healthClient.Check(
		context.Background(),
		&healthsvc.HealthCheckRequest{
			Service: "Repo",
		},
	)
	if err == nil {
		t.Fatalf("Expected non-nil error, Got nil error")
	}
	expectedError := status.Errorf(
		codes.NotFound, "unknown service",
	)
	if !errors.Is(err, expectedError) {
		t.Fatalf(
			"Expected error %v, Got; %v",
			err,
			expectedError,
		)
	}
}

func TestHealthServiceWatch(t *testing.T) {

	l := startTestGrpcServer()
	healthClient, err := getHealthSvcClient(l)
	if err != nil {
		t.Fatal(err)
	}

	client, err := healthClient.Watch(
		context.Background(),
		&healthsvc.HealthCheckRequest{
			Service: "Users",
		},
	)
	if err != nil {
		t.Fatal(err)
	}

	resp, err := client.Recv()
	if err != nil {
		t.Fatalf("Error in Watch: %#v\n", err)
	}
	if resp.Status != healthsvc.HealthCheckResponse_SERVING {
		t.Errorf("Expected SERVING, Got: %#v", resp.Status.String())
	}

	updateServiceHealth(
		h,
		"Users",
		healthsvc.HealthCheckResponse_NOT_SERVING,
	)

	resp, err = client.Recv()
	if err != nil {
		t.Fatalf("Error in Watch: %#v\n", err)
	}
	if resp.Status != healthsvc.HealthCheckResponse_NOT_SERVING {
		t.Errorf("Expected NOT_SERVING, Got: %#v", resp.Status.String())
	}
}

func TestUserService1(t *testing.T) {

	l := startTestGrpcServer()

	bufconnDialer := func(
		ctx context.Context, addr string,
	) (net.Conn, error) {
		return l.Dial()
	}

	client, err := grpc.DialContext(
		context.Background(),
		"", grpc.WithInsecure(),
		grpc.WithContextDialer(bufconnDialer),
	)
	if err != nil {
		t.Fatal(err)
	}
	usersClient := users.NewUsersClient(client)
	resp, err := usersClient.GetUser(
		context.Background(),
		&users.UserGetRequest{
			Email: "jane@doe.com",
			Id:    "foo-bar",
		},
	)

	if err != nil {
		t.Fatal(err)
	}
	if resp.User.FirstName != "jane" {
		t.Errorf(
			"Expected FirstName to be: jane, Got: %s",
			resp.User.FirstName,
		)
	}
}

4.2 服务端处理运行时错误

如果是流式服务

func panicStreamInterceptor(
	srv interface{},
	stream grpc.ServerStream,
	info *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) (err error) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("Panic recovered: %v\n", r)
			err = status.Error(
				codes.Internal,
				"Unexpected error happened",
			)
		}
	}()
	serverStream := wrappedServerStream{
		ServerStream: stream,
	}
	err = handler(srv, serverStream)
	return
}

如果是单一服务

func panicUnaryInterceptor(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (resp interface{}, err error) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("Panic recovered: %v\n", r)
			err = status.Error(
				codes.Internal,
				"Unexpected error happened",
			)
		}
	}()
	resp, err = handler(ctx, req)
	return
}

4.3 服务端终止请求处理

流式


func timeoutStreamInterceptor(
	srv interface{},
	stream grpc.ServerStream,
	info *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) error {
	serverStream := wrappedServerStream{
		RecvMsgTimeout: 500 * time.Millisecond,
		ServerStream:   stream,
	}
	err := handler(srv, serverStream)
	return err
}

单一


func timeoutUnaryInterceptor(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error) {
	var resp interface{}
	var err error

	ctxWithTimeout, cancel := context.WithTimeout(ctx, 300*time.Millisecond)
	defer cancel()

	ch := make(chan error)

	go func() {
		resp, err = handler(ctxWithTimeout, req)
		ch <- err
	}()

	select {
	case <-ctxWithTimeout.Done():
		cancel()
		err = status.Error(
			codes.DeadlineExceeded,
			fmt.Sprintf("%s: Deadline exceeded", info.FullMethod),
		)
		return resp, err
	case <-ch:

	}
	return resp, err
}

4.4 客户端提高连接配置

增加超时机制,就是增加连接配置

func setupGrpcConn(addr string) (*grpc.ClientConn, context.CancelFunc, error) {
	log.Printf("Connecting to server on %s\n", addr)
	ctx, cancel := context.WithTimeout(
		context.Background(),
		10*time.Second,
	)
	conn, err := grpc.DialContext(
		ctx,
		addr,
		grpc.WithInsecure(),
		grpc.WithBlock(),
		grpc.FailOnNonTempDialError(true),
		grpc.WithReturnConnectionError(),
	)
	return conn, cancel, err
}

4.5 为方法调用设置超时

用上面的方法context即可


http://www.kler.cn/a/586352.html

相关文章:

  • 工业相机视频播放(RTSP)
  • STM32Cubemx-H7-7-OLED屏幕
  • OkHttp 的证书设置
  • 蓝桥杯——路标设置
  • Celery - 入门(get-started)
  • 精准车型识别:视觉分析技术的力量
  • 海鲜水产行业wordpress外贸主题
  • Golang Channel 使用详解、注意事项与死锁分析
  • 软考教材重点内容 信息安全工程师 第19章 操作系统安全保护
  • Dify1.01版本vscode 本地环境搭建运行实践
  • AI+Python机器学习小项目教程(数据分类)
  • 算法基础 -- Brian Kernighan 算法初识
  • 基础知识《HTTP字段与状态码详细说明》
  • 【基于 SSE 协议与 EventSource 实现 AI 对话的流式交互】
  • Stable Diffusion API /sdapi/v1/txt2img的完整参数列表及其说明
  • leetcode hot 100(三)
  • python全栈-MySQL知识
  • MySQL:MySQL库和表的基本操作
  • SpringBoot实现一个Redis限流注解
  • Springboot项目修改端口