当前位置: 首页 > article >正文

RPC 集群,gRPC 广播和组播

一、集群抽象:cluster

它是指我们在调用远程的时候,尝试解决:

1、failover:即引入重试功能,但是重试的时候会换一个新节点

2、failfast: 立刻失败,不需要重试

3、广播:将请求发送到所有的节点上

4、组播:组播和分组功能不太一样,组播是指请求发送到一组节点上,而不是只发送到一个单一节点上

注意:failover

二、 gRPC 的 Interceptor 分成好几种:

UnaryClientInterceptor: 用于拦截 gRPC unary 请求

StreamClientIntercepror 用于拦截 gRPC 的 stream 请求。

三、gRPC 广播:注册中心获取所有节点

思路:

利用拦截器捕获调用

利用注册中心来获取所有的服务实例

在拦截器内遍历所有的服务端实例

四、gRPC 限流:

利用服务端拦截器调用,进行限流逻辑

五、gRPC 广播实现

gRPC 广播利用客户端拦截器实现,步骤也非常简单,以下几步:

1、利用注册中心获取所有节点

2、利用filter 过滤节点

3、grpc.Dial 循环调用节点,发起tcp 请求

注意:reflect.TypeOf(reply).Elem() 以及 reflect.New(typ).Interface() 生成一个新的Reply 避免覆盖, filter 是nil 就是广播,非nil 就是组播

对节点进行过滤

type MyIntercaptor struct {
   register registry.Register
   method   string
   filter   Filter
}
​
type SetOptions func(optins *MyIntercaptor)
​
func NewMyInterceptor(register registry.Register, method string, options ...SetOptions) *MyIntercaptor {
   t := &MyIntercaptor{
      register: register,
      method:   method,
      filter: func(g1 string, ctx context.Context) bool {
         return true
      },
   }
   for _, opt := range options {
      opt(t)
   }
   return t
}
​
func WithMyInterSetFilter(filter Filter) SetOptions {
   return func(option *MyIntercaptor) {
      option.filter = filter
   }
}
​
func (m *MyIntercaptor) Intercaptor() grpc.UnaryClientInterceptor {
   return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
      list, er := m.register.ListServices(ctx, m.method)
      if er != nil {
         return er
      }
      // 是否是广播
      ok, resp := IsBroadCast(ctx)
      defer func() {
         close(resp)
      }()
      if !ok {
         return invoker(ctx, method, req, reply, cc, opts...)
      }
      var err errgroup.Group
      for _, li := range list {
         if !m.filter(li.Group, ctx) {
            continue
         }
         if li.Addr == "" {
            continue
         }
         typ := reflect.TypeOf(reply).Elem()
         addr := li.Addr
         // 并发调用
         err.Go(func() error {
            dial, er := grpc.Dial(addr, grpc.WithInsecure())
            if er != nil {
               resp <- Resp{
                  Err: er,
               }
               return nil
            }
            rep := reflect.New(typ).Interface()
            // 发送方法请求
            er = invoker(ctx, method, req, rep, dial, opts...)
            resp <- Resp{
               Err:   er,
               Reply: rep,
            }
            return nil
         })
​
      }
      return err.Wait()
   }
}
​
type Filter func(g1 string, ctx context.Context) bool
​
func NewFilter() Filter {
   return func(g1 string, ctx context.Context) bool {
      group, ok := ctx.Value("group").(string)
      return ok && group == g1
   }
}
​
type broadcastKey struct {
}
​
func UseBroadcastKey(ctx context.Context) (context.Context, chan Resp) {
   ch := make(chan Resp)
   return context.WithValue(ctx, broadcastKey{}, ch), ch
}
​
func IsBroadCast(ctx context.Context) (bool, chan Resp) {
   resp, ok := ctx.Value(broadcastKey{}).(chan Resp)
   return ok, resp
}
​
type Resp struct {
   Err   error
   Reply any
}


http://www.kler.cn/a/161965.html

相关文章:

  • opencv常用api
  • 鸿蒙HarmonyOS 地图不显示解决方案
  • C++编程:利用环形缓冲区优化 TCP 发送流程,避免 Short Write 问题
  • Appium配置2024.11.12
  • 探索Pillow库:Python图像处理的瑞士军刀
  • 32位、64位、x86与x64:深入解析计算机架构
  • ELK架构监控MySQL慢日志
  • git-vscode
  • ubuntu20.04设置开机自启动jar(依赖其他服务)
  • 简单介绍一些其他的树
  • 阿里云 ACR 制品中心 AI/大数据镜像专场上新推荐榜
  • 【教程】逻辑回归怎么做多分类
  • 转转闲鱼链接后台搭建教程+完整版源码
  • 上海市青少年算法2022年10月月赛(乙组)
  • 【BUG】SpringBoot项目Long类型数据返回前端精度丢失问题
  • 论文分享 | 基于机载单目视觉的四旋翼无人机群内相对定位
  • 数据库管理-第120期 初探Halo数据库(202301201)
  • vue的props
  • git 本地有改动,远程也有改动,且文件是自动生成的配置文件
  • 【vuex】
  • 探索Vue小程序框架的底层原理
  • WPF Mvvm模式下面如何将事件映射到ViewModel层
  • lambda技巧之—如何在有多个判断分支的情况下,还能优雅的使用auto ?
  • Gee教程5.中间件
  • 微信小程序动态加载图表[echart]
  • 假设检验(三)(单侧假设检验)