go GRPC学习笔记
本博文源于笔者正在学习的gprc,相关配套书籍素材来源是《Go编程进阶实战》,博文内容主要包含了RPC模式讲解,RPC通过htttp访问、拦截器、提高服务端与客户端容错的内容配置
在此之前需要下载protoc,这里不做下载过程
1、RPC模式
首先定义在rpc模式里共有四种
- Unary RPC 单个请求–单个响应
- Server Streaming RPC 多个请求–单个响应
- Client Streaming RPC 单个请求–多个响应
- Bidriectional Streaming RPC 多个请求–多个响应
1.1 举例:编写
文件夹目录
编写protoc文件
syntax = "proto3"; // 声明编译器用的是prototype3
package person;
option go_package="/projectRPCTest3/pb/person;person";
message PersonReq{
string name =1;
int32 age = 2;
}
message PersonRes{
string name = 1;
int32 age = 2;
}
service SearchService {
rpc Search(PersonReq) returns (PersonRes);
rpc SearchIn(stream PersonReq) returns(PersonRes);
rpc SearchOut(PersonReq) returns (stream PersonRes);
rpc SearchIO(stream PersonReq) returns (stream PersonRes);
}
编译出protoc的两个go文件,在person目录下进行编译
cd .\pb
cd .\person\
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\person.proto
main.go文件进行编写:
Search
将收到的请求进行返回SearchIn
多个请求过来统一做回复SearchOut
单个请求过来,多次响应SearchIO
多个请求过来,多个响应
package main
import (
"context"
"fmt"
"time"
"net"
"projectRPCTest3/pb/person"
"google.golang.org/grpc"
)
type personServe struct {
person.UnimplementedSearchServiceServer
}
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
res := &person.PersonRes{Name: "我收到了" + name + "的信息"}
return res, nil
}
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {
for {
req, err := server.Recv()
fmt.Println(req)
if err != nil {
server.SendAndClose(&person.PersonRes{Name: "success"})
break
}
}
return nil
}
func (*personServe) SearchOut(req *person.PersonReq, server grpc.ServerStreamingServer[person.PersonRes]) error {
name := req.Name
i := 0
for {
if i > 10 {
return nil
}
time.Sleep(1 * time.Second)
server.Send(&person.PersonRes{Name: "I got it" + name})
i++
}
}
func (*personServe) SearchIO(server grpc.BidiStreamingServer[person.PersonReq, person.PersonRes]) error {
str := make(chan string)
go func() {
for {
req, err := server.Recv()
if err != nil {
str <- "Result"
break
}
str <- req.Name
}
}()
for {
s := <-str
if s == "Result" {
break
}
server.Send(&person.PersonRes{Name: "I got it" + s})
}
return nil
}
func main() {
l, _ := net.Listen("tcp", ":8888")
s := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServe{})
s.Serve(l)
}
然后使用客户端client对server发送消息,进行测试,分为四个部分,最后给出完整的client.go
search
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})
if err != nil {
fmt.Println(err)
}
wg.Done()
fmt.Println(search.GetName())
}
wg.Wait()
searchIn
c, _ := client.SearchIn(context.Background())
i := 0
for {
if i > 10 {
res, _ := c.CloseAndRecv()
fmt.Println(res)
break
}
time.Sleep(1 * time.Second)
c.Send(&person.PersonReq{Name: "client send message...."})
i++
}
searchOut
c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})
for {
req, err := c.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(req)
}
searchIO
c, _ := client.SearchIO(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
err := c.Send(&person.PersonReq{Name: " hello"})
time.Sleep(2 * time.Second)
if err != nil {
wg.Done()
break
}
}
}()
go func() {
for {
req, err := c.Recv()
fmt.Println(req)
if err != nil {
fmt.Println(err)
wg.Done()
break
}
}
}()
wg.Wait()
完整的client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"projectRPCTest3/pb/person"
"sync"
"time"
)
func main() {
l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())
client := person.NewSearchServiceClient(l)
//wg := sync.WaitGroup{}
//wg.Add(10)
//for i := 0; i < 10; i++ {
// search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})
// if err != nil {
// fmt.Println(err)
// }
// wg.Done()
// fmt.Println(search.GetName())
//}
//wg.Wait()
//c, _ := client.SearchIn(context.Background())
//i := 0
//for {
// if i > 10 {
// res, _ := c.CloseAndRecv()
// fmt.Println(res)
// break
//
// }
// time.Sleep(1 * time.Second)
// c.Send(&person.PersonReq{Name: "client send message...."})
// i++
//
//}
//c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})
//for {
// req, err := c.Recv()
// if err != nil {
// fmt.Println(err)
// break
// }
// fmt.Println(req)
//}
c, _ := client.SearchIO(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
err := c.Send(&person.PersonReq{Name: " hello"})
time.Sleep(2 * time.Second)
if err != nil {
wg.Done()
break
}
}
}()
go func() {
for {
req, err := c.Recv()
fmt.Println(req)
if err != nil {
fmt.Println(err)
wg.Done()
break
}
}
}()
wg.Wait()
}
1.2 任意字节传输
proto文件定义,文件名称为repositories.proto
syntax = "proto3";
option go_package = "projectBindataClientStreaming/service";
service Repo {
rpc CreateRepo(stream RepoCreateRequest) returns(RepoCreateReply) {}
}
message RepoCreateRequest {
oneof body {
RepoContext context = 1;
bytes data = 2;
}
}
message RepoContext {
string creator_id = 1;
string name =2;
}
message Repository {
string id = 1;
string name = 2;
string url = 3;
}
message RepoCreateReply {
Repository repo = 1;
int32 size =2;
}
进行编译proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\repositories.proto
server.go进行实现CreateRepo,然后注册服务,开启tcp,
package main
import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"log"
"net"
"os"
svc "projectBindataClientStreaming/service"
)
type repoService struct {
svc.UnimplementedRepoServer
}
func (s *repoService) CreateRepo(
stream svc.Repo_CreateRepoServer,
) error {
var repoContext *svc.RepoContext
var data []byte
for {
r, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return status.Error(
codes.Unknown,
err.Error(),
)
}
switch t := r.Body.(type) {
case *svc.RepoCreateRequest_Context:
repoContext = r.GetContext()
case *svc.RepoCreateRequest_Data:
b := r.GetData()
data = append(data, b...)
case nil:
return status.Error(
codes.InvalidArgument,
"Message doesn't contain context or data",
)
default:
return status.Errorf(
codes.FailedPrecondition,
"Unexpected message type: %s",
t,
)
}
}
repo := svc.Repository{
Name: repoContext.Name,
Url: fmt.Sprintf(
"https://git.example.com/%s/%s",
repoContext.CreatorId,
repoContext.Name,
),
}
r := svc.RepoCreateReply{
Repo: &repo,
Size: int32(len(data)),
}
return stream.SendAndClose(&r)
}
func registerServices(s *grpc.Server) {
svc.RegisterRepoServer(s, &repoService{})
}
func startServer(s *grpc.Server, l net.Listener) error {
return s.Serve(l)
}
func main() {
listenAddr := ":50051"
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
registerServices(s)
log.Fatal(startServer(s, lis))
}
测试server_test.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"io"
"net"
svc "projectBindataClientStreaming/service"
"strings"
"testing"
)
func startTestGrpcServer() *bufconn.Listener {
l := bufconn.Listen(1)
s := grpc.NewServer()
registerServices(s)
go func() {
startServer(s, l)
}()
return l
}
func TestCreateRepo(t *testing.T) {
l := startTestGrpcServer()
bufconnDialer := func(ctx context.Context, address string) (net.Conn, error) {
return l.Dial()
}
client, err := grpc.DialContext(context.Background(),
"", grpc.WithContextDialer(bufconnDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("could not dial bufconn: %v", err)
}
repoClient := svc.NewRepoClient(client)
stream, err := repoClient.CreateRepo(
context.Background(),
)
if err != nil {
t.Fatal("CreateRepo", err)
}
c := svc.RepoCreateRequest_Context{
Context: &svc.RepoContext{
CreatorId: "user-123",
Name: "test-repo",
},
}
r := svc.RepoCreateRequest{
Body: &c,
}
err = stream.Send(&r)
if err != nil {
t.Fatal("stream.Send", err)
}
data := "Arbitrary Data Bytes"
repoData := strings.NewReader(data)
for {
b, err := repoData.ReadByte()
if err == io.EOF {
break
}
bData := svc.RepoCreateRequest_Data{
Data: []byte{b},
}
r := svc.RepoCreateRequest{
Body: &bData,
}
err = stream.Send(&r)
if err != nil {
t.Fatal("stream.Send", err)
}
l.Close()
}
resp, err := stream.CloseAndRecv()
if err != nil {
t.Fatal("CloseAndRecv", err)
}
expectedSize := int32(len(data))
if resp.Size != expectedSize {
t.Errorf("resp.Size != expectedSize, resp: %v", resp.Size)
}
expectedRepoUrl := "https://git.example.com/user-123/test-repo"
if resp.Repo.Url != expectedRepoUrl {
t.Errorf("resp.Repo.Url != expectedRepoUrl, resp: %v,got:%s", resp.Repo.Url,
expectedRepoUrl)
}
}
2、grpc通过http访问
上一小节通过tcp,这次通过http访问
2.1 案例,restful访问请求
proto文件编写,下面代码中的注释部分是用来测试,可复制过来一起测试。
syntax = "proto3";
package person;
option go_package="projectRPCTest4/pb/person;person";
import "google/api/annotations.proto";
message Mbody {
string name = 1;
}
message PersonReq {
string name = 1;
int32 age = 2;
Mbody body =3;
}
message PersonRes {
string name = 1;
int32 age = 2;
Mbody body = 3;
}
service SearchService {
rpc Search(PersonReq) returns (PersonRes){
option(google.api.http)= {
// post:"/api/person",
// get:"/api/person",
// get:"/api/person/{name}/{age}/{body.name}",
get:"/api/person/{name=qm}/{age}/{body.name}",//只能固定
//body:"body"
};
};
}
其中文件夹目录,与其他文件一起附上
annotations.proto
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "AnnotationsProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
extend google.protobuf.MethodOptions {
// See `HttpRule`.
HttpRule http = 72295728;
}
http.proto
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "HttpProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
// Defines the HTTP configuration for an API service. It contains a list of
// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
// to one or more HTTP REST API methods.
message Http {
// A list of HTTP configuration rules that apply to individual API methods.
//
// **NOTE:** All service configuration rules follow "last one wins" order.
repeated HttpRule rules = 1;
// When set to true, URL path parameters will be fully URI-decoded except in
// cases of single segment matches in reserved expansion, where "%2F" will be
// left encoded.
//
// The default behavior is to not decode RFC 6570 reserved characters in multi
// segment matches.
bool fully_decode_reserved_expansion = 2;
}
// gRPC Transcoding
//
// gRPC Transcoding is a feature for mapping between a gRPC method and one or
// more HTTP REST endpoints. It allows developers to build a single API service
// that supports both gRPC APIs and REST APIs. Many systems, including [Google
// APIs](https://github.com/googleapis/googleapis),
// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
// and use it for large scale production services.
//
// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
// how different portions of the gRPC request message are mapped to the URL
// path, URL query parameters, and HTTP request body. It also controls how the
// gRPC response message is mapped to the HTTP response body. `HttpRule` is
// typically specified as an `google.api.http` annotation on the gRPC method.
//
// Each mapping specifies a URL path template and an HTTP method. The path
// template may refer to one or more fields in the gRPC request message, as long
// as each field is a non-repeated field with a primitive (non-message) type.
// The path template controls how fields of the request message are mapped to
// the URL path.
//
// Example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get: "/v1/{name=messages/*}"
// };
// }
// }
// message GetMessageRequest {
// string name = 1; // Mapped to URL path.
// }
// message Message {
// string text = 1; // The resource content.
// }
//
// This enables an HTTP REST to gRPC mapping as below:
//
// - HTTP: `GET /v1/messages/123456`
// - gRPC: `GetMessage(name: "messages/123456")`
//
// Any fields in the request message which are not bound by the path template
// automatically become HTTP query parameters if there is no HTTP request body.
// For example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get:"/v1/messages/{message_id}"
// };
// }
// }
// message GetMessageRequest {
// message SubMessage {
// string subfield = 1;
// }
// string message_id = 1; // Mapped to URL path.
// int64 revision = 2; // Mapped to URL query parameter `revision`.
// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`.
// }
//
// This enables a HTTP JSON to RPC mapping as below:
//
// - HTTP: `GET /v1/messages/123456?revision=2&sub.subfield=foo`
// - gRPC: `GetMessage(message_id: "123456" revision: 2 sub:
// SubMessage(subfield: "foo"))`
//
// Note that fields which are mapped to URL query parameters must have a
// primitive type or a repeated primitive type or a non-repeated message type.
// In the case of a repeated type, the parameter can be repeated in the URL
// as `...?param=A¶m=B`. In the case of a message type, each field of the
// message is mapped to a separate parameter, such as
// `...?foo.a=A&foo.b=B&foo.c=C`.
//
// For HTTP methods that allow a request body, the `body` field
// specifies the mapping. Consider a REST update method on the
// message resource collection:
//
// service Messaging {
// rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
// option (google.api.http) = {
// patch: "/v1/messages/{message_id}"
// body: "message"
// };
// }
// }
// message UpdateMessageRequest {
// string message_id = 1; // mapped to the URL
// Message message = 2; // mapped to the body
// }
//
// The following HTTP JSON to RPC mapping is enabled, where the
// representation of the JSON in the request body is determined by
// protos JSON encoding:
//
// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }`
// - gRPC: `UpdateMessage(message_id: "123456" message { text: "Hi!" })`
//
// The special name `*` can be used in the body mapping to define that
// every field not bound by the path template should be mapped to the
// request body. This enables the following alternative definition of
// the update method:
//
// service Messaging {
// rpc UpdateMessage(Message) returns (Message) {
// option (google.api.http) = {
// patch: "/v1/messages/{message_id}"
// body: "*"
// };
// }
// }
// message Message {
// string message_id = 1;
// string text = 2;
// }
//
//
// The following HTTP JSON to RPC mapping is enabled:
//
// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }`
// - gRPC: `UpdateMessage(message_id: "123456" text: "Hi!")`
//
// Note that when using `*` in the body mapping, it is not possible to
// have HTTP parameters, as all fields not bound by the path end in
// the body. This makes this option more rarely used in practice when
// defining REST APIs. The common usage of `*` is in custom methods
// which don't use the URL at all for transferring data.
//
// It is possible to define multiple HTTP methods for one RPC by using
// the `additional_bindings` option. Example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get: "/v1/messages/{message_id}"
// additional_bindings {
// get: "/v1/users/{user_id}/messages/{message_id}"
// }
// };
// }
// }
// message GetMessageRequest {
// string message_id = 1;
// string user_id = 2;
// }
//
// This enables the following two alternative HTTP JSON to RPC mappings:
//
// - HTTP: `GET /v1/messages/123456`
// - gRPC: `GetMessage(message_id: "123456")`
//
// - HTTP: `GET /v1/users/me/messages/123456`
// - gRPC: `GetMessage(user_id: "me" message_id: "123456")`
//
// Rules for HTTP mapping
//
// 1. Leaf request fields (recursive expansion nested messages in the request
// message) are classified into three categories:
// - Fields referred by the path template. They are passed via the URL path.
// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They
// are passed via the HTTP
// request body.
// - All other fields are passed via the URL query parameters, and the
// parameter name is the field path in the request message. A repeated
// field can be represented as multiple query parameters under the same
// name.
// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL
// query parameter, all fields
// are passed via URL path and HTTP request body.
// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP
// request body, all
// fields are passed via URL path and URL query parameters.
//
// Path template syntax
//
// Template = "/" Segments [ Verb ] ;
// Segments = Segment { "/" Segment } ;
// Segment = "*" | "**" | LITERAL | Variable ;
// Variable = "{" FieldPath [ "=" Segments ] "}" ;
// FieldPath = IDENT { "." IDENT } ;
// Verb = ":" LITERAL ;
//
// The syntax `*` matches a single URL path segment. The syntax `**` matches
// zero or more URL path segments, which must be the last part of the URL path
// except the `Verb`.
//
// The syntax `Variable` matches part of the URL path as specified by its
// template. A variable template must not contain other variables. If a variable
// matches a single path segment, its template may be omitted, e.g. `{var}`
// is equivalent to `{var=*}`.
//
// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
// contains any reserved character, such characters should be percent-encoded
// before the matching.
//
// If a variable contains exactly one path segment, such as `"{var}"` or
// `"{var=*}"`, when such a variable is expanded into a URL path on the client
// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
// server side does the reverse decoding. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{var}`.
//
// If a variable contains multiple path segments, such as `"{var=foo/*}"`
// or `"{var=**}"`, when such a variable is expanded into a URL path on the
// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
// The server side does the reverse decoding, except "%2F" and "%2f" are left
// unchanged. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{+var}`.
//
// Using gRPC API Service Configuration
//
// gRPC API Service Configuration (service config) is a configuration language
// for configuring a gRPC service to become a user-facing product. The
// service config is simply the YAML representation of the `google.api.Service`
// proto message.
//
// As an alternative to annotating your proto file, you can configure gRPC
// transcoding in your service config YAML files. You do this by specifying a
// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
// effect as the proto annotation. This can be particularly useful if you
// have a proto that is reused in multiple services. Note that any transcoding
// specified in the service config will override any matching transcoding
// configuration in the proto.
//
// The following example selects a gRPC method and applies an `HttpRule` to it:
//
// http:
// rules:
// - selector: example.v1.Messaging.GetMessage
// get: /v1/messages/{message_id}/{sub.subfield}
//
// Special notes
//
// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
// proto to JSON conversion must follow the [proto3
// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
//
// While the single segment variable follows the semantics of
// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
// Expansion, the multi segment variable **does not** follow RFC 6570 Section
// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
// does not expand special characters like `?` and `#`, which would lead
// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
// for multi segment variables.
//
// The path variables **must not** refer to any repeated or mapped field,
// because client libraries are not capable of handling such variable expansion.
//
// The path variables **must not** capture the leading "/" character. The reason
// is that the most common use case "{var}" does not capture the leading "/"
// character. For consistency, all path variables must share the same behavior.
//
// Repeated message fields must not be mapped to URL query parameters, because
// no client library can support such complicated mapping.
//
// If an API needs to use a JSON array for request or response body, it can map
// the request or response body to a repeated field. However, some gRPC
// Transcoding implementations may not support this feature.
message HttpRule {
// Selects a method to which this rule applies.
//
// Refer to [selector][google.api.DocumentationRule.selector] for syntax
// details.
string selector = 1;
// Determines the URL pattern is matched by this rules. This pattern can be
// used with any of the {get|put|post|delete|patch} methods. A custom method
// can be defined using the 'custom' field.
oneof pattern {
// Maps to HTTP GET. Used for listing and getting information about
// resources.
string get = 2;
// Maps to HTTP PUT. Used for replacing a resource.
string put = 3;
// Maps to HTTP POST. Used for creating a resource or performing an action.
string post = 4;
// Maps to HTTP DELETE. Used for deleting a resource.
string delete = 5;
// Maps to HTTP PATCH. Used for updating a resource.
string patch = 6;
// The custom pattern is used for specifying an HTTP method that is not
// included in the `pattern` field, such as HEAD, or "*" to leave the
// HTTP method unspecified for this rule. The wild-card rule is useful
// for services that provide content to Web (HTML) clients.
CustomHttpPattern custom = 8;
}
// The name of the request field whose value is mapped to the HTTP request
// body, or `*` for mapping all request fields not captured by the path
// pattern to the HTTP body, or omitted for not having any HTTP request body.
//
// NOTE: the referred field must be present at the top-level of the request
// message type.
string body = 7;
// Optional. The name of the response field whose value is mapped to the HTTP
// response body. When omitted, the entire response message will be used
// as the HTTP response body.
//
// NOTE: The referred field must be present at the top-level of the response
// message type.
string response_body = 12;
// Additional HTTP bindings for the selector. Nested bindings must
// not contain an `additional_bindings` field themselves (that is,
// the nesting may only be one level deep).
repeated HttpRule additional_bindings = 11;
}
// A custom pattern is used for defining custom HTTP verb.
message CustomHttpPattern {
// The name of this custom HTTP verb.
string kind = 1;
// The path matched by this custom verb.
string path = 2;
}
proto文件进行编译
protoc -I ./pb --go_out ./pb --go_opt paths=source_relative --go-grpc_out ./pb --go-grpc_opt paths=source_relative --grpc-gateway_out ./pb --grpc-gateway_opt paths=source_relative ./pb/person/person.proto
编写persos.proto中的Search,将接收的打印即可,后面在做注册
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
fmt.Println(name)
fmt.Println(req.Name)
fmt.Println(req.Age)
fmt.Println(req.String())
req.GetBody()
res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}
return res, nil
}
完整的service.go,运行main后,直接用postman进行访问,比较好
package main
import (
"context"
"fmt"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"net"
"net/http"
"projectRPCTest4/pb/person"
"sync"
"google.golang.org/grpc"
)
type personServe struct {
person.UnimplementedSearchServiceServer
}
func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
fmt.Println(name)
fmt.Println(req.Name)
fmt.Println(req.Age)
fmt.Println(req.String())
req.GetBody()
res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}
return res, nil
}
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go registerGateway(&wg)
go registerGRPC(&wg)
//time.Sleep(100 * time.Second)
wg.Wait()
}
func registerGateway(wg *sync.WaitGroup) {
conn, _ := grpc.DialContext(context.Background(), "localhost:8888", grpc.WithBlock(), grpc.WithInsecure())
mux := runtime.NewServeMux()
gwServer := &http.Server{
Handler: mux,
Addr: ":8090",
}
err := person.RegisterSearchServiceHandler(context.Background(), mux, conn)
if err != nil {
wg.Done()
panic(err)
}
gwServer.ListenAndServe()
}
func registerGRPC(wg *sync.WaitGroup) {
l, err := net.Listen("tcp", ":8888")
if err != nil {
wg.Done()
panic(err)
}
s := grpc.NewServer()
person.RegisterSearchServiceServer(s, &personServe{})
s.Serve(l)
}
3、拦截器
拦截器的语法编写
syntax = "proto3";
option go_package = "projectRPCTest5-4/service/users";
service Users {
rpc GetUser (UserGetRequest) returns (UserGetReply) {}
rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}
message UserGetRequest {
string email = 1;
string id = 2;
}
message User {
string id = 1;
string first_name = 2;
string last_name = 3;
int32 age = 4;
}
message UserGetReply {
User user = 1;
}
message UserHelpRequest {
User user = 1;
string request = 2;
}
message UserHelpReply {
string response = 1;
}
3.1 客户端拦截器
package main
import (
"bufio"
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"io"
"log"
"os"
svc "projectRPCTest5-4/service"
)
func metadataUnaryInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctxWithMetadata := metadata.AppendToOutgoingContext(
ctx,
"Request-Id",
"request-123",
)
return invoker(ctxWithMetadata, method, req, reply, cc, opts...)
}
func metadataStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctxWithMetadata := metadata.AppendToOutgoingContext(
ctx,
"Request-Id",
"request-123",
)
clientStream, err := streamer(ctxWithMetadata, desc, cc, method, opts...)
return clientStream, err
}
func setupGrpcConn(addr string) (*grpc.ClientConn, error) {
return grpc.DialContext(context.Background(), addr,
grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(metadataUnaryInterceptor),
grpc.WithStreamInterceptor(metadataStreamInterceptor))
}
func getUserServiceClient(conn *grpc.ClientConn) svc.UsersClient {
return svc.NewUsersClient(conn)
}
func getUser(client svc.UsersClient,
u *svc.UserGetRequest) (*svc.UserGetReply, error) {
return client.GetUser(context.Background(), u)
}
func setupChat(r io.Reader, w io.Writer, c svc.UsersClient) error {
stream, err := c.GetHelp(context.Background())
if err != nil {
return err
}
for {
scanner := bufio.NewScanner(r)
prompt := "Request:"
fmt.Fprint(w, prompt)
scanner.Scan()
if err := scanner.Err(); err != nil {
return err
}
msg := scanner.Text()
if msg == "quit" {
break
}
request := svc.UserHelpRequest{
Request: msg,
}
err := stream.Send(&request)
if err != nil {
return err
}
resp, err := stream.Recv()
if err != nil {
return err
}
fmt.Println("Recv", resp.Response)
}
return stream.CloseSend()
}
func main() {
if len(os.Args) != 3 {
log.Fatal("Specify a gRPC server and method to call")
}
serverAddrr := os.Args[1]
methodName := os.Args[2]
conn, err := setupGrpcConn(serverAddrr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
c := getUserServiceClient(conn)
switch methodName {
case "GetUser":
result, err := getUser(
c,
&svc.UserGetRequest{Email: "jane@doe.com"},
)
if err != nil {
log.Fatal(err)
}
fmt.Fprintf(
os.Stdout, "User: %s %s\n",
result.User.FirstName,
result.User.LastName,
)
case "GetHelp":
err = setupChat(os.Stdin, os.Stdout, c)
if err != nil {
log.Fatal(err)
}
default:
log.Fatal("Unrecognized method name")
}
}
3.2 服务端拦截器
拦截器这里是在每个请求加上日志请求
package main
//
import (
// "bufio"
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"io"
"log"
"net"
"os"
svc "projectRPCTest5-4/service"
"strings"
"time"
)
type userService struct {
svc.UnimplementedUsersServer
}
func logMessage(ctx context.Context, method string, latency time.Duration, err error) {
var requestId string
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Println("md", md)
} else {
if len(md.Get("Request-Id")) != 0 {
requestId = md.Get("Request-Id")[0]
}
}
log.Printf("Method:%s,Error:%v,Latency:%v,Request-Id:%s", method, err, latency, requestId)
}
func loggingUnaryInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{},
error) {
start := time.Now()
resp, err := handler(ctx, req)
logMessage(ctx, info.FullMethod, time.Since(start), err)
return resp, err
}
func loggingStreamInterceptor(srv interface{}, stream grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
err := handler(srv, stream)
ctx := stream.Context()
logMessage(ctx, info.FullMethod, time.Since(start), err)
return err
}
func (s *userService) GetUser(
ctx context.Context, in *svc.UserGetRequest) (*svc.UserGetReply, error) {
log.Printf("Received request for user with Email:%s Id:%s\n",
in.Email, in.Id)
components := strings.Split(in.Email, "@")
if len(components) != 2 {
return nil, errors.New("invalid email")
}
u := svc.User{
Id: in.Id,
FirstName: components[0],
LastName: components[1],
Age: 36,
}
return &svc.UserGetReply{User: &u}, nil
}
func (s *userService) GetHelp(stream svc.Users_GetHelpServer) error {
log.Println("Client connected")
for {
request, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Printf("Request:%+v\n", request.Request)
response := svc.UserHelpReply{
Response: request.Request,
}
err = stream.Send(&response)
if err != nil {
return err
}
}
log.Println("Client disconnected")
return nil
}
func registerServices(s *grpc.Server) {
svc.RegisterUsersServer(s, &userService{})
}
func startServer(s *grpc.Server, l net.Listener) error {
return s.Serve(l)
}
func main() {
listenAddr := os.Getenv("LISTEN_ADDR")
if len(listenAddr) == 0 {
listenAddr = ":50051"
}
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer(
grpc.UnaryInterceptor(loggingUnaryInterceptor),
grpc.StreamInterceptor(loggingStreamInterceptor))
registerServices(s)
log.Fatal(startServer(s, lis))
}
3.4 连接拦截器
就是将众多拦截器集成在一个应用程序里
grpc.ChainUnaryInterceptor(
metricUnaryInterceptro,
loggingUnaryInterceptor,
)
grpc ChainStreamInterceptor(
metricStreamInterceptro,
loggingStreamInterceptor,
)
4.服务端与客户端健壮性
4.1 服务端健康检查
syntax = "proto3";
option go_package = "projectRPCHealthCheck/service/users";
service Users {
rpc GetUser (UserGetRequest) returns (UserGetReply) {}
rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}
message UserGetRequest {
string email = 1;
string id = 2;
}
message User {
string id = 1;
string first_name = 2;
string last_name = 3;
int32 age = 4;
}
message UserGetReply {
User user = 1;
}
message UserHelpRequest {
User user = 1;
string request = 2;
}
message UserHelpReply {
string response = 1;
}
服务端的go文件
package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"google.golang.org/grpc"
healthz "google.golang.org/grpc/health"
healthsvc "google.golang.org/grpc/health/grpc_health_v1"
svc "projectRPCHealthCheck/service"
)
type userService struct {
svc.UnimplementedUsersServer
}
func (s *userService) GetUser(
ctx context.Context,
in *svc.UserGetRequest,
) (*svc.UserGetReply, error) {
log.Printf(
"Received request for user with Email: %s Id: %s\n",
in.Email,
in.Id,
)
components := strings.Split(in.Email, "@")
if len(components) != 2 {
return nil, errors.New("invalid email address")
}
u := svc.User{
Id: in.Id,
FirstName: components[0],
LastName: components[1],
Age: 36,
}
return &svc.UserGetReply{User: &u}, nil
}
func (s *userService) GetHelp(
stream svc.Users_GetHelpServer,
) error {
for {
request, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Printf("Request receieved: %s\n", request.Request)
response := svc.UserHelpReply{
Response: request.Request,
}
err = stream.Send(&response)
if err != nil {
return err
}
}
return nil
}
func registerServices(s *grpc.Server, h *healthz.Server) {
svc.RegisterUsersServer(s, &userService{})
healthsvc.RegisterHealthServer(s, h)
}
func updateServiceHealth(
h *healthz.Server,
service string,
status healthsvc.HealthCheckResponse_ServingStatus,
) {
h.SetServingStatus(
service,
status,
)
}
func startServer(s *grpc.Server, l net.Listener) error {
return s.Serve(l)
}
func main() {
listenAddr := os.Getenv("LISTEN_ADDR")
if len(listenAddr) == 0 {
listenAddr = ":50051"
}
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
h := healthz.NewServer()
registerServices(s, h)
updateServiceHealth(
h,
svc.Users_ServiceDesc.ServiceName,
healthsvc.HealthCheckResponse_SERVING,
)
log.Fatal(startServer(s, lis))
}
对这段代码进行测试
package main
import (
"context"
"errors"
healthz "google.golang.org/grpc/health"
"log"
"net"
svc "projectRPCHealthCheck/service"
users "projectRPCHealthCheck/service"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthsvc "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
)
var h *healthz.Server
func startTestGrpcServer() *bufconn.Listener {
h = healthz.NewServer()
l := bufconn.Listen(10)
s := grpc.NewServer()
registerServices(s, h)
updateServiceHealth(
h,
svc.Users_ServiceDesc.ServiceName,
healthsvc.HealthCheckResponse_SERVING,
)
go func() {
log.Fatal(startServer(s, l))
}()
return l
}
func getHealthSvcClient(l *bufconn.Listener) (healthsvc.HealthClient, error) {
bufconnDialer := func(
ctx context.Context, addr string,
) (net.Conn, error) {
return l.Dial()
}
client, err := grpc.DialContext(
context.Background(),
"", grpc.WithInsecure(),
grpc.WithContextDialer(bufconnDialer),
)
if err != nil {
return nil, err
}
return healthsvc.NewHealthClient(client), nil
}
func TestHealthService(t *testing.T) {
l := startTestGrpcServer()
healthClient, err := getHealthSvcClient(l)
if err != nil {
t.Fatal(err)
}
resp, err := healthClient.Check(
context.Background(),
&healthsvc.HealthCheckRequest{},
)
if err != nil {
t.Fatal(err)
}
serviceHealthStatus := resp.Status.String()
if serviceHealthStatus != "SERVING" {
t.Fatalf(
"Expected health: SERVING, Got: %s",
serviceHealthStatus,
)
}
}
func TestHealthServiceUsers(t *testing.T) {
l := startTestGrpcServer()
healthClient, err := getHealthSvcClient(l)
if err != nil {
t.Fatal(err)
}
resp, err := healthClient.Check(
context.Background(),
&healthsvc.HealthCheckRequest{
Service: "Users",
},
)
if err != nil {
t.Fatal(err)
}
serviceHealthStatus := resp.Status.String()
if serviceHealthStatus != "SERVING" {
t.Fatalf(
"Expected health: SERVING, Got: %s",
serviceHealthStatus,
)
}
}
func TestHealthServiceUnknown(t *testing.T) {
l := startTestGrpcServer()
healthClient, err := getHealthSvcClient(l)
if err != nil {
t.Fatal(err)
}
_, err = healthClient.Check(
context.Background(),
&healthsvc.HealthCheckRequest{
Service: "Repo",
},
)
if err == nil {
t.Fatalf("Expected non-nil error, Got nil error")
}
expectedError := status.Errorf(
codes.NotFound, "unknown service",
)
if !errors.Is(err, expectedError) {
t.Fatalf(
"Expected error %v, Got; %v",
err,
expectedError,
)
}
}
func TestHealthServiceWatch(t *testing.T) {
l := startTestGrpcServer()
healthClient, err := getHealthSvcClient(l)
if err != nil {
t.Fatal(err)
}
client, err := healthClient.Watch(
context.Background(),
&healthsvc.HealthCheckRequest{
Service: "Users",
},
)
if err != nil {
t.Fatal(err)
}
resp, err := client.Recv()
if err != nil {
t.Fatalf("Error in Watch: %#v\n", err)
}
if resp.Status != healthsvc.HealthCheckResponse_SERVING {
t.Errorf("Expected SERVING, Got: %#v", resp.Status.String())
}
updateServiceHealth(
h,
"Users",
healthsvc.HealthCheckResponse_NOT_SERVING,
)
resp, err = client.Recv()
if err != nil {
t.Fatalf("Error in Watch: %#v\n", err)
}
if resp.Status != healthsvc.HealthCheckResponse_NOT_SERVING {
t.Errorf("Expected NOT_SERVING, Got: %#v", resp.Status.String())
}
}
func TestUserService1(t *testing.T) {
l := startTestGrpcServer()
bufconnDialer := func(
ctx context.Context, addr string,
) (net.Conn, error) {
return l.Dial()
}
client, err := grpc.DialContext(
context.Background(),
"", grpc.WithInsecure(),
grpc.WithContextDialer(bufconnDialer),
)
if err != nil {
t.Fatal(err)
}
usersClient := users.NewUsersClient(client)
resp, err := usersClient.GetUser(
context.Background(),
&users.UserGetRequest{
Email: "jane@doe.com",
Id: "foo-bar",
},
)
if err != nil {
t.Fatal(err)
}
if resp.User.FirstName != "jane" {
t.Errorf(
"Expected FirstName to be: jane, Got: %s",
resp.User.FirstName,
)
}
}
4.2 服务端处理运行时错误
如果是流式服务
func panicStreamInterceptor(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) (err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n", r)
err = status.Error(
codes.Internal,
"Unexpected error happened",
)
}
}()
serverStream := wrappedServerStream{
ServerStream: stream,
}
err = handler(srv, serverStream)
return
}
如果是单一服务
func panicUnaryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n", r)
err = status.Error(
codes.Internal,
"Unexpected error happened",
)
}
}()
resp, err = handler(ctx, req)
return
}
4.3 服务端终止请求处理
流式
func timeoutStreamInterceptor(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
serverStream := wrappedServerStream{
RecvMsgTimeout: 500 * time.Millisecond,
ServerStream: stream,
}
err := handler(srv, serverStream)
return err
}
单一
func timeoutUnaryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
var resp interface{}
var err error
ctxWithTimeout, cancel := context.WithTimeout(ctx, 300*time.Millisecond)
defer cancel()
ch := make(chan error)
go func() {
resp, err = handler(ctxWithTimeout, req)
ch <- err
}()
select {
case <-ctxWithTimeout.Done():
cancel()
err = status.Error(
codes.DeadlineExceeded,
fmt.Sprintf("%s: Deadline exceeded", info.FullMethod),
)
return resp, err
case <-ch:
}
return resp, err
}
4.4 客户端提高连接配置
增加超时机制,就是增加连接配置
func setupGrpcConn(addr string) (*grpc.ClientConn, context.CancelFunc, error) {
log.Printf("Connecting to server on %s\n", addr)
ctx, cancel := context.WithTimeout(
context.Background(),
10*time.Second,
)
conn, err := grpc.DialContext(
ctx,
addr,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
)
return conn, cancel, err
}
4.5 为方法调用设置超时
用上面的方法context即可