RPC 集群,gRPC 广播和组播
一、集群抽象:cluster
它是指我们在调用远程的时候,尝试解决:
1、failover:即引入重试功能,但是重试的时候会换一个新节点
2、failfast: 立刻失败,不需要重试
3、广播:将请求发送到所有的节点上
4、组播:组播和分组功能不太一样,组播是指请求发送到一组节点上,而不是只发送到一个单一节点上
注意:failover
二、 gRPC 的 Interceptor 分成好几种:
UnaryClientInterceptor: 用于拦截 gRPC unary 请求
StreamClientIntercepror 用于拦截 gRPC 的 stream 请求。
三、gRPC 广播:注册中心获取所有节点
思路:
利用拦截器捕获调用
利用注册中心来获取所有的服务实例
在拦截器内遍历所有的服务端实例
四、gRPC 限流:
利用服务端拦截器调用,进行限流逻辑
五、gRPC 广播实现
gRPC 广播利用客户端拦截器实现,步骤也非常简单,以下几步:
1、利用注册中心获取所有节点
2、利用filter 过滤节点
3、grpc.Dial 循环调用节点,发起tcp 请求
注意:reflect.TypeOf(reply).Elem() 以及 reflect.New(typ).Interface() 生成一个新的Reply 避免覆盖, filter 是nil 就是广播,非nil 就是组播
对节点进行过滤
type MyIntercaptor struct {
register registry.Register
method string
filter Filter
}
type SetOptions func(optins *MyIntercaptor)
func NewMyInterceptor(register registry.Register, method string, options ...SetOptions) *MyIntercaptor {
t := &MyIntercaptor{
register: register,
method: method,
filter: func(g1 string, ctx context.Context) bool {
return true
},
}
for _, opt := range options {
opt(t)
}
return t
}
func WithMyInterSetFilter(filter Filter) SetOptions {
return func(option *MyIntercaptor) {
option.filter = filter
}
}
func (m *MyIntercaptor) Intercaptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
list, er := m.register.ListServices(ctx, m.method)
if er != nil {
return er
}
// 是否是广播
ok, resp := IsBroadCast(ctx)
defer func() {
close(resp)
}()
if !ok {
return invoker(ctx, method, req, reply, cc, opts...)
}
var err errgroup.Group
for _, li := range list {
if !m.filter(li.Group, ctx) {
continue
}
if li.Addr == "" {
continue
}
typ := reflect.TypeOf(reply).Elem()
addr := li.Addr
// 并发调用
err.Go(func() error {
dial, er := grpc.Dial(addr, grpc.WithInsecure())
if er != nil {
resp <- Resp{
Err: er,
}
return nil
}
rep := reflect.New(typ).Interface()
// 发送方法请求
er = invoker(ctx, method, req, rep, dial, opts...)
resp <- Resp{
Err: er,
Reply: rep,
}
return nil
})
}
return err.Wait()
}
}
type Filter func(g1 string, ctx context.Context) bool
func NewFilter() Filter {
return func(g1 string, ctx context.Context) bool {
group, ok := ctx.Value("group").(string)
return ok && group == g1
}
}
type broadcastKey struct {
}
func UseBroadcastKey(ctx context.Context) (context.Context, chan Resp) {
ch := make(chan Resp)
return context.WithValue(ctx, broadcastKey{}, ch), ch
}
func IsBroadCast(ctx context.Context) (bool, chan Resp) {
resp, ok := ctx.Value(broadcastKey{}).(chan Resp)
return ok, resp
}
type Resp struct {
Err error
Reply any
}