当前位置: 首页 > article >正文

YarnClient发送和接收请求源码解析

YarnClient发送和接收请求流程

在这里插入图片描述

Yarn是通过RPC协议通信的,协议类型可以通过查看RpcKind类得知,总共有三种类型:

RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
RPC_PROTOCOL_BUFFER ((short) 3);

其中Hadoop和Yarn组件大部分是通过Protobuf(Protocol Buffers)协议进行通信。

Protobuf 是一种由 Google 开发的二进制序列化格式和相关的技术,它用于高效地序列化和反序列化结构化数据,通常用于网络通信、数据存储等场景。

Protobuf 在许多领域都得到了广泛应用,特别是在分布式系统、RPC(Remote Procedure Call)框架和数据存储中,它提供了一种高效、简洁和可扩展的方式来序列化和交换数据,Protobuf 的主要优点包括:

  • 高效性:Protobuf 序列化后的二进制数据通常比其他序列化格式(比如超级常用的JSON)更小,并且序列化和反序列化的速度更快,这对于性能敏感的应用非常有益。
  • 简洁性:Protobuf 使用一种定义消息格式的语法,它允许定义字段类型、顺序和规则
  • 版本兼容性:Protobuf 支持向前和向后兼容的版本控制,使得在消息格式发生变化时可以更容易地处理不同版本的通信。
  • 语言无关性:Protobuf 定义的消息格式可以在多种编程语言中使用,这有助于跨语言的通信和数据交换
  • 自动生成代码:Protobuf 通常与相应的工具一起使用,可以自动生成代码,包括序列化/反序列化代码和相关的类

发送和接收请求流程

发送请求

Yarn发送的请求协议都继承GeneratedMessage类实现Message接口,它们都是YarnServiceProtos的内部类。

请求头的协议类型可以在RpcHeaderProtos里面查看,它们都是RpcHeaderProtos的内部类,例如RpcRequestHeaderProto。

请求的协议类型可以在ApplicationClientProtocol里查看,它们都继承了ApplicationClientProtocol内部类。

YarnClient请求头使用的是ProtobufRpcEngineProtos的内部类RequestHeaderProto。

请求的连接信息都存储在Client的内部类ConnectionId里面,包含票据、协议类、目标地址、是否需要认证、配置文件、还有连接参数等信息。

调用器ProtobufRpcEngine.Invoker类用于代理客户端发送请求,该实例存储着Client、Client.ConnectionId等信息,它真正是通过Client实例发送请求的,Client每次请求时都会创建一个Client#Call和Client#Connection,代表每次发送的请求和连接。

Call表示调用操作,里面包含重试次数、id、响应、是否完成、请求等进行连接操作需要的信息。

Connection代表连接,它有ConnectionId的所有参数以及Socket客户端、流管道、连接参数等信息。

发送请求流程:

SaslRpcClient#sendSaslMessage
Connection#setupIOstreams
使用调用器代理发送请求
ProtobufRpcEngine.Invoker#invoke
调用器通过方法名、协议类名、协议版本构建RPC请求头
ProtobufRpcEngine.Invoker#constructRpcRequestHeader
获取参数中的Message协议以及使用创建的请求头,创建封装RpcRequestWrapper
客户端发送封装的RPC请求
Client#call
使用请求创建Call
通过ConnectionId创建Connection,并建立连接
建立该Connection的IO通道
Connection#setupIOstreams
不断建立连接并读取socket中的数据
建立连接
Client.Connection#setupConnection
先创建Socket的IO通道
NetUtils.getInputStream
NetUtils.getOutputStream
如果ConnectionId中包含UGI信息,则建立安全连接
Connection#setupSaslConnection
创建SaslRpcClient并开始连接
SaslRpcClient#saslConnect
发送Sasl协议请求RpcSaslProto,该协议初始状态为NEGOTIATE状态
SaslRpcClient#sendSaslMessage
开始循环握手交换信息
读取响应信息头,判断响应都是否有错误标志
读取响应体,解析响应体为RpcSaslProto协议响应
根据响应的RpcSaslProto状态判断认证是否有问题
如果状态为协商状态NEGOTIATE
如果状态为质疑状态CHALLENGE,则需要评估响应的Token,然后创建Sasl回复再次协商
如果状态为成功状态SUCCESS,简单认证则完成认证,否则还需再次评估Token再完成认证
继续发送请求
SaslRpcClient#sendSaslMessage
返回协商一致的认证方法
认证成功后,先创建响应的连接管道,并建立连接环境
Client.Connection#writeConnectionContext
开始接收RPC响应
Client#run && Client#receiveRpcResponse
发送RPC远程过程调用请求
Connection#sendRpcRequest
最后返回响应数据
Client.Call#getRpcResponse
从响应的可认证方式中选择相应的认证方式
SaslRpcClient#selectSaslClient
如果认证方式为SIMPLE认证,则直接完成认证
否则,读取响应的token信息,验证token并生成响应信息,该响应信息将会再次发送给服务器进行协商
SaslClient#evaluateChallenge(如果在身份验证过程中收到来自服务器的质询,则调用此方法来准备提交给服务器的适当的下一个响应)
创建Sasl响应回复,此次回复的SASL状态为INITIATE状态,认证方式为所选择的方式,并附带上面生成的响应信息
SaslClient#evaluateChallenge

YarnClient发送请求案例:

YarnClientImpl#getApplications()
HadoopYarnProtoRPC#getProxy
ClientRMProxy#createRMProxy
YarnClientImpl#serviceStart
YarnClient服务初始化
YarnClientImpl#serviceStart
创建ApplicationClientProtocol代理实例
ClientRMProxy#createRMProxy
如果设置了高可用和重试机制,则会先创建RMFailoverProxy
创建RPC代理
RMProxy#getProxy
通过YarnRPC获取代理
HadoopYarnProtoRPC#getProxy
从配置文件中读取并实例化RpcClientFactory的实现类 (yarn.ipc.client.factory.class)
通过RpcClientFactory获取客户端
RpcClientFactoryPBImpl#getClient
从配置文件中读取相应的ApplicationClientProtocol实现类配置,默认为ApplicationClientProtocolPBClientImpl,并创建该实例
ProtobufRpcEngine创建ProtobufRpcEngine.Invoker代理ApplicationClientProtocolPB实例
ProtobufRpcEngine#getProxy
YarnClientImpl发送getApplications请求
YarnClientImpl#getApplications()
创建请求实例GetApplicationsRequest
GetApplicationsRequest#newInstance
ApplicationClientProtocolPBClientImpl代理客户端发送getApplications请求
ApplicationClientProtocolPBClientImpl#getApplications
ApplicationClientProtocolPBClientImpl#getApplications
创建GetApplicationsRequestProto协议,使用调用器代理发送请求
ProtobufRpcEngine.Invoker#invoke
创建并返回GetApplicationsResponsePBImpl响应

接收请求

Yarn使用org.apache.hadoop.ipc.Server作为服务器,因为Yarn有三种RPC类型,所以Server也有三种实现类,ProtobufRpcEngine、RPC和WritableRPCEngine、TestServer(用于测试)

服务器创建时会创建连接监听器、响应器、地址、hadoop配置文件、端口、处理器数量、请求调用队列、安全管理器、 连接管理器,指标服务等。

服务器的连接监听器 (org.apache.hadoop.ipc.Server.Listener) 用于接收连接,它几乎代表着服务器本身,它在创建时会创建一个nio服务器,用于接收和管理所有请求,它将接收的请求放入连接管理器中,然后交给Reader处理。

Reader (org.apache.hadoop.ipc.Server.Listener.Reader) 也将在Listener创建时创建的,Listener会创建多个Reader,每个Reader代表一个数据处理器,它用于读取连接监听器接收的请求信息,通过请求信息创建请求调用 (Call) 实例,并将Call实例放入请求调用队列中,请求调用队列将会由后面的Handler (服务器启动后创建) 处理。

Listener#readAndProcess
Reader创建和启动
创建连接器队列,用于存储连接监听器接收到的请求
创建Selector循环监听并处理连接器队列中的请求
Reader#doRunLoop
Reader#doRead
循环接收并处理请求中的数据
Listener#readAndProcess
先读取连接数据中的一些信息创建连接头,如数据长度、连接头(请求类型、版本、认证信息)等,然后开始处理请求
Server#channelRead
处理请求体数据
Server.Connection#processOneRpc
Server.Connection#processOneRpc
序列化获取RpcRequestHeaderProto
Server.Connection#decodeProtobufFromStream
校验RPC头
Server.Connection#checkRpcHeaders
开始处理RCP请求,反序列化创建RpcRequestWrapper,并创建相应的调用请求实例,然后存放到调用请求队列中
Server.Connection#processRpcRequest

服务器的响应器 (org.apache.hadoop.ipc.Server.Responder) 用于响应请求 ,返回请求结果或者异常。

服务器启动后,将会创建多个处理器 (org.apache.hadoop.ipc.Server.Handler) 轮询处理调用请求队列中的调用请求。

  • 处理器轮询调用Call实例

理器轮询调用Call实例

Handler#run
判断调用请求中是否包含UGI信息,如果包含ugi,则在调用之前需要使用ugi进行认证
处理调用请求
Server#call
由于大部分Yarn 组件使用通信组件都是 RPC 组件,且调用器通常都是ProtobufRPCEngine,这里就再解析一下ProtobufRPCEngine处理请求原理。
RPC 服务器处理请求
RPC.Server#call
获取 RPC 请求调用器 (RpcInvoker)
RPCInvoker 请求调用器处理请求
RpcInvoker#call
ProtobufRpcEngine.Server.ProtoBufRpcInvoker#call
获取协议的实现类,实现类也是一个阻塞服务(BlockingService)
ProtobufRpcEngine.Server.ProtoBufRpcInvoker#getProtocolImpl
通过BlockingService获取需要调用的方法
Descriptors.ServiceDescriptor#findMethodByName
BlockingService开始调用方法
BlockingService#callBlockingMethod
创建响应实例
Server#setupResponse
响应请求
Responder#doRespond

http://www.kler.cn/a/287050.html

相关文章:

  • 2025寒假备战蓝桥杯01---朴素二分查找的学习
  • 1170 Safari Park (25)
  • GS论文阅读--GeoTexDensifier
  • Dockerfile另一种使用普通用户启动的方式
  • gitlab使用多数据库
  • WPS计算机二级•高效操作技巧
  • 深度学习复盘与论文复现G 项目维护
  • NTFS硬盘支持工具Paragon NTFS for Mac 15.4.44 中文破解版
  • 2024.9.1 Python,跳跃游戏,贪心算法,回溯算法复原 IP 地址,关于回溯过程中列表的[:]以及copy问题再讨论
  • Flowable之传阅功能实现
  • 今日算法:蓝桥杯基础题之“星期一”
  • easyExcel 单元格合并
  • C++开发基础之宏定义:入门、中级、高级用法示例解析
  • 计算机毕业设计选题推荐-体育馆场地预约系统-Java/Python项目实战
  • OpenHarmony开发:应用分层架构设计
  • 汽车免拆诊断案例 | 2012 款大众速腾车发动机偶尔抖动
  • 【Python机器学习】NLP词中的数学——齐普夫定律
  • vue点击事件
  • Mac怎么安装谷歌浏览器
  • 算法-汇总区间(228)
  • FFmpeg源码:append_packet_chunked、av_get_packet函数分析
  • Android自定义View实现彩虹进度条(带动画)
  • 完美解决Jenkins重启后自动杀掉衍生进程(子进程)问题
  • ​哈哈题库​邀请书
  • [Day 68] 區塊鏈與人工智能的聯動應用:理論、技術與實踐
  • 如何在 CentOS 6 上安装 Nagios