Hadoop RPC 网络通信框架原理剖析
YARN RPC 服务端的工作大致可以分为四个阶段:
第一个阶段: Server 初始化和启动
在 Server 初始化的时候,会初始化 Listener 组件(内部启动了一个 AcceptSelector 绑定了相应的端口,用来处理客户端的 OP_ACCEPT 事件),内部还初始化了一组 Reader 线程,其实就是启动了 ReaderSelector,用来处理 OP_READ 事件。还启动一个 Responder 线程用来处理响应。
在 Server 调用 start() 方法启动的时候,启动 Listener 线程 和 Responder 线程。然后还初始化了一组 Handler 线程,专门用来处理存储在 callQueue 的 RpcCall 请求。
第二个阶段: 接收请求,封装 RpcCall
代码入口就是 Listener 的 run() 方法接收来自各个客户端的 RPC 请求,并将他们封装成 RpcCall 类放到一个共享队列 callQueue 中。再详细点,该过程可以分为建立链接和接受请求两个阶段,分 别由 Listener 和 Reader 两种线程完成。
具体来讲,首先 Listener 接受客户端的请求,然后通过轮询的方式从 Reader 中获取一个线程用来处理 OP_READ 事件读取 RPC 请求数据,如果对应客户端有数据写过来,则该 Reader 负责读取数据,然后封装成 RpcCall 加入到 callQueue 队列中等待 Handler 来执行下一步 处理。
第三个阶段: 处理 RpcCall 请求
Handler 从 callQueue 中获取 RpcCall 来执行处理,并执行对应的 Rpc 函数调用,将得到的结果返回给客户端。
第四个阶段: 返回结果
在 Server 内部只有一个 Responder 线程。它内部的 WriteSelector 负责监听 OP_WRITE 事件。如果 Responder 并不能一次性将结果写出去,就注册 OP_WRITE 事件分多次异步写。
Hadoop RPC 源码分析
RPC 客户端源码分析
客户端请求三层调用关系
Hadoop RPC 客户端的工作机制
当我们获取一个某个通信协议的代理对象的时候,我们可以通过这个代理对象和 Server 进行通信,请求 Server 端的服务 客户端获取的代理对象内部,是构建一个 Invoker,也就是一个 InvocationHandler,当发送 RPC 请求的时候,实际上,就会回调了 Invoker.invoke() 方法 Invoker 的内部封装了 Client,当我们真的通过代理对象调用某个方法的时候,实际上,底层是通过 Client.call() 发送一个 RPC 请求给服务端,再获取到结果 Call 方法的具体实现,分为四步走:
第一步: 构建 Call 对象
第二步: 构建 Connection 链接对象,实际上内部维护了一个 Socket 客户端
第三步: 通过 Connection 发送 RPC 请求到服务端,并且把获取到的结果设置到 Call 对象的 rpcResponse 成员变量上
第四步: 从 Call 上获取 RPC 执行结果
RPC 服务端源码分析
Key – Connection – Channel 一一对应, Connection 又会封装在 Call 之中。调用过程如下如所示:
HDFS RPC 通信协议整理
ClientProtocol(Client -> NameNode) Client 和 NameNode 之间的通信协议,DFSClient 通过这个协议可以操纵HDFS的目录命名空间、打开与关闭文件流,比如向 NameNode 发送 RPC 请求创建文件夹,删除文件等等。
DatanodeProtocol(NameNode -> DataNode) NameNode 和 DataNode之间的通信协议,DataNode 通过此协议向 NameNode 注册,心跳,块汇报等。
ClientDatanodeProtocol(Client -> DataNode) Client 和 DataNode之间的通信协议,用来进行块恢复。
InterDatanodeProtocol(DataNode -> DataNode) DataNode 和 DataNode 之间的通信协议,该协议用于Datanode 进程之间进行通信。
NamenodeProtocol(NameNode -> SecondaryNameNode) NameNode 和 SecondaryNameNode 之间的通信协议,负责进行 checkpoint 相关工作的交互。
DataTransferProtocol(DataNode -> DataNode) DataNode 和 DataNode 之间的通信协议,负责进行 DataNode 相互之间的数据传输的通信协议。
YARN RPC 通信协议整理
ApplicationClientProtocol(Client -> RM) clients 与 RM 之间的协议, JobClient 通过该 RPC 协议提交应用程序、 查询应用程序状态等。
ResourceTracker(NM -> RM) NM 与 RM 之间的协议, NM 通过该 RPC 协议向 RM 注册, 并定时发送心跳信息汇报当前节点的资源使用情况和 Container 运行情况。
ApplicationMasterProtocol(AM -> RM) AM 与 RM 之间的协议, AM 通过该 RPC 协议向 RM 注册和撤销自己, 并为各个任务申请资源。
ContainerManagementProtocol(AM -> NM) AM 与 NM 之间的协议, AM 通过该 RPC 要求 NM 启动或者停止 Container, 获取各个 Container 的使用状态等信息。
ResourceManagerAdministrationProtocol(RM Admin -> RM) Admin 与 RM 之间的通信协议, Admin 通过该 RPC 协议更新系统配置文件, 例如节点黑白名单等。
HAServiceProtocol(Active RM HA Framework Standby RM) Active RM 和 Standby RM 之间的通信协议,提供状态监控和 fail over 的 HA 服务。
TaskUmbilicalProtocol(YarnChild -> MRAppMaster) YarnChild 和 MRAppMaster 之间的通信协议,用于 MRAppMaster 监控跟踪 YarnChild 的运行状态,YarnChild 向 MRAppMaster 拉取 Task 任务信息。
MRClientProtocol(JobClient -> ApplicationMaster) JobClient 和 ApplicationMaster 之间的通信协议。用于客户端拉取 Application 的执行状态,以及 Application 返回执行结果给 客户端。