当前位置: 首页 > article >正文

Envoy控制面实践

简介

Envoy 是一款由 Lyft 开源的,使用 C++ 编写的 L7 代理和通信总线,目前是 CNCF 旗下的开源项目且已经毕业,代码托管在 GitHub 上,它也是 Istio 服务网格中默认的数据平面。关于 Envoy 的详情请阅读 Envoy 中文文档。Envoy 本身无法构成一个完整的 Service Mesh,但是它可以作为 service mesh 中的应用间流量的代理,负责 service mesh 中的数据层。

Envoy架构中的一些重要概念:

  • Downstream:下游主机,指连接到Envoy的主机,这些主机用来发送请求并接受响应。

  • Upstream:上游主机,指接收来自Envoy连接和请求的主机,并返回响应。

  • Listener:服务或程序的监听器, Envoy暴露一个或多个监听器监听下游主机的请求,当监听到请求时,通过Filter Chain把对请求的处理全部抽象为Filter, 例如ReadFilter、WriteFilter、HttpFilter等。

  • Cluster:服务提供集群,指Envoy连接的一组逻辑相同的上游主机。Envoy通过服务发现功能来发现集群内的成员,通过负载均衡功能将流量路由到集群的各个成员。

  • xDS:xDS中的x是一个代词,类似云计算里的XaaS可以指代IaaS、PaaS、SaaS等。DS为Discovery Service,即发现服务的意思。xDS包括CDS(cluster discovery service)、RDS(route discovery service)、EDS(endpoint discovery service)、ADS(aggregated discovery service),其中ADS称为聚合的发现服务,是对CDS、RDS、LDS、EDS服务的统一封装,解决CDS、RDS、LDS、EDS信息更新顺序依赖的问题,从而保证以一定的顺序同步各类配置信息。以上Endpoint、Cluster、Route的概念介绍如下:

  • Endpoint:一个具体的“应用实例”,类似于Kubernetes中的一个Pod;

  • Cluster:可以理解“应用集群”,对应提供相同服务的一个或多个Endpoint, 类似Kubernetes中Service概念,即一个Service提供多个相同服务的Pod;

  • Route:当我们做金丝雀发布部署时,同一个服务会有多个版本,这时需要Route规则规定请求如何路由到其中的某个版本上。

Envoy正常的工作流程为Host A(下游主机)发送请求至上游主机(Host B、Host C、Host D等),Envoy通过Listener监听到有下游主机的请求,收到请求后的Envoy将所有请求流量劫持至Envoy内部,并将请求内容抽象为Filter Chains路由至某个上游主机中从而实现路由转发及负载均衡能力。

xDS

Envoy为了实现流量代理能力通常需要一个统一的配置文件来记录信息以便启动时加载,在Envoy中启动配置文件有静态配置和动态配置两种方式。静态配置是将配置信息写入文件中,启动时直接加载,动态配置通过xDS实现一个Envoy的服务端(可以理解为以API接口对外实现服务发现能力)。

xDS模块的功能是通过Envoy API V1(基于HTTP)或V2(基于gRPC)实现一个服务端将配置信息暴露给上游主机,等待上游主机的拉取。

xDS API 在envoy中被称为 Data plane API 。其代码保存在 https://github.com/envoyproxy/envoy/tree/master/api/envoy/api/v2,用户可以根据proto文件自行生成相对应语言的GRPC代码文件。

Envoy 官方提供了两份 xDS Server 的实现,分别是:

  • go-control-plane 基于Golang 的 xDS Server 实现代码

  • java-control-plane 基于 Java 的 xDS Server 实现代码

另外,官方还把 api 的定义代码从 Envoy 的源码库中提取出来,放在了 https://github.com/envoyproxy/data-plane-api

实践

要实现一个简单的控制面,一般就是基于xDS api来管理动态配置的能力,如下几步

1. 实现callback

type callbacks struct {
   test.Callbacks
   simpleCache cache.SnapshotCache
   hash        cache.NodeHash
   datasource  service.Datasource
   config      *Config
}

func (c *callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
   nodeId := c.hash.ID(request.Node)
   logrus.Debugf("node: %s on stream request version_info: %s resource_names: %s type_url: %s response_nonce: %s error: %+v", nodeId, request.VersionInfo, request.ResourceNames, request.TypeUrl, request.ResponseNonce, request.ErrorDetail)
   if !containsString(c.simpleCache.GetStatusKeys(), nodeId) {
      c.SetSnapshot(nodeId)
   }
   return c.Callbacks.OnStreamRequest(id, request)
}

根据业务需要,实现相应的回调方法。

2. 创建grpc服务

server := server.NewServer(ctx, simpleCache, cb)

var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions,
   grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
   grpc.KeepaliveParams(keepalive.ServerParameters{
      Time:    grpcKeepaliveTime,
      Timeout: grpcKeepaliveTimeout,
   }),
   grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
      MinTime:             grpcKeepaliveMinTime,
      PermitWithoutStream: true,
   }),
)
grpcServer := grpc.NewServer(grpcOptions...)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
   logrus.Fatal(err)
}

discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)

logrus.Infof("management server listening on %d\n", port)
	if err = grpcServer.Serve(lis); err != nil {
		logrus.Println(err)
	}

上面的流程大致就是:

  • 构造一个业务server实例,将回调实例传入

  • 构造一个grpc服务器实例

  • 向grpc注册业务服务,也就是各个xDS api的实现服务

  • 启动一个grpc服务器

3. 构造快照

snap, _ := cache.NewSnapshot(time.Now().Format(time.RFC3339),
   map[resource.Type][]types.Resource{
      resource.ClusterType:  clusters,
      resource.RouteType:    {makeRoute(RouteName, routes)},
      resource.ListenerType: {makeHTTPListener(ListenerName, RouteName, listenerPort)},
   },
)

根据xDS 数据格式,构造所需的业务数据,也就是最后存储在envoy中的动态配置

4. 返回快照

if err := snapshot.Consistent(); err != nil {
   logrus.Errorf("snapshot inconsistency: %+v\n%+v", snapshot, err)
   return
}

if err := c.simpleCache.SetSnapshot(context.Background(), nodeId, snapshot); err != nil {
   logrus.Errorf("snapshot error %q for %+v", err, snapshot)
   return
}

将业务数据动态注入envoy配置中


http://www.kler.cn/news/17366.html

相关文章:

  • 漫画 | Linux之父:财务自由以后,我失眠了!
  • 华为OD机试 - 整理扑克牌(Python)
  • [计算机图形学]光场,颜色与感知(前瞻预习/复习回顾)
  • 对比 LVS 负载均衡群集的 NAT 模式和 DR 模式,基于 CentOS 7 构建 LVS-DR 群集
  • springboot 集成 shardingSphere 加mybatisplus 自带增加 分页查询 和源代码包 分库分表 单库 分表 使用雪花算法id
  • node.js 处理路径问题
  • VR与AR:哪个有更大的潜力改变未来?
  • 今天面了个字节跳动拿35K出来的,真是砂纸擦屁股,给我露了一手啊
  • Skywalking
  • gtest之高级主题
  • Spring常用注解总结
  • PAT A1024 Palindromic Number
  • Java对象的创建方式以及对象的引用
  • 【Elsevier】中科院2区TOP, 高被引119篇, 稳定检索22年, 1周可见刊,5月15截稿~
  • Simulink 自动代码生成电机控制:弱磁控制从仿真到硬件开发板验证实验
  • 豪取BAT!超详细暑期实习算法面经(非科班无论文)
  • 如何监控一个程序的运行情况,然后视情况将进程杀死并重启
  • redis使用总结
  • 对传递函数的零极点、频率响应、稳定性的理解
  • Vue3 如何全局使用按钮截流指令
  • 复古决战快速施法穿墙秒怪分析流程及安全防护
  • 网络基础设施 拥塞控制
  • 基于JavaWeb实现的寻码网文章资讯管理系统
  • 动态页面配置
  • 我有一个方法判断你有没有编程天赋
  • ElasticSearch学习随笔之分词算法
  • 第17章 信息系统安全管理
  • IAST工具是如何工作的?主动和被动IAST有什么区别?
  • 信号完整性分析基础知识之传输线和反射(三):仿真和测试反射波形
  • 开放式基金净值估算数据 API 数据接口