一、状态概念

在流处理中有很多操作依赖独立事件即可,有的操作不仅需要依赖当前事件还要依赖前序事件。这样flink算子分为状态算子和有状态算子

flink对算子的状态保存不仅可以实现更多高级特性,还可以通过状态的保存和恢复来实现容错

二、状态实现

1.状态触发

Flink程序保存状态的场景主要有以下三种

2.状态存储实现

Flink内置两个状态存储实现HashMapStateBackend和EmbeddedRocksDBStateBackend,默认使用HashMapStateBackend。

2.1 HashMapStateBackend

状态数据以Java对象的形式存储在JVM堆中,状态快照可以持久化到文件系统或者Job Manager的堆中。

2.2 EmbeddedRocksDBStateBackend

状态数据序列化成字节数组方式存储在RocksDB数据库中,数据库默认数据存储在TaskManager数据目录采用异步方式生成状态快照

2.3 状态存储对比

flink程序选择状态存储实现就是性能与可拓展之间的权衡。
HashMapStateBackend由于状态的读写都是在javaheap上操作因此速度快,但状态大小受限于集群中可用内存
EmbeddedRocksDBStateBackend可以根据可用disk进行空间拓展,并且支持增量状态快照。但由于读写都需要反序列化因此比HashMapStateBackend一个数量级。

3.设置状态存储实现

3.1 单个作业设置

StreamExecutionEnvironment 可以对每个 Job 的 State Backend 进行设置如下所示

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

3.2 全局设置

flinkconf.yaml 可以通过state.backend.type 设置默认的 State Backend。

# 用于存储 operator state 快照的 State Backend
state.backend: hashmap
# 存储快照目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

三、容错机制

flink容错机制就是针对算子和数据流进行持久快照。当程序发生故障时,flink可以根据快照将应用程序的状态完整的恢复并处理。

1.状态快照

快照时flink作业状态一致性景象。快照包括每个数据源指针以及作业每个状态运算符的状态数据,状态数据是指处理了数据源偏移位置之后的所有事件生成的状态。
快照分为CheckPoint和SavePoint两种:

2.状态快照生成

简单的想法就是 同一个时间点 所有节点停止处理 然后将状态和上游通道中的事件保存下来。但是由于是分布式节点协调同一个时间比较难,完全停止也影响实时性
根据批处理经验,可以将实时流处理一段流之后,停止读取等待所有节点处理完毕之后,这样只需要保存实时流的位置和所有节点的状态即可。这样如果想要重放只需要将所有节点状态恢复,并按照原有位置读取实时即可
按照上边思路,上游节点保存完状态后需要等待最后一个节点处理完成才能处理新数据。如果处理流程越长,保存快照的时间就会越长,最后导致计算资源利用率低。其实仔细思考一下如果当前节点保存完快照,完全可以继续处理接下来的事件,因为并不会影响本节点已经生成的状态快照。

Flink采用就是类似的想法进行快照的处理,首先会在事件流中插入barrier将实时流逻辑分割多个限流。当算子接受到barrier时需要执行快照生成然后将barrier广播给下游算子。当需要恢复时,重新部署服务,从快照中把每个算子的状态恢复,然后从barrier处重新读取源数据即可

3.Checkpoint Barrier

JobManager会告知TaskManager需要进行状态快照生成时,TaskManager会记录Sources的偏移量,并且将Checkpoint barrier插入到事件流中。
在这里插入图片描述
其中状态快照n将包含偏移量以及每个算子的状态,状态是指此算子消费了barrier n之前所有事件、但是不包含barrier n之后的任何事件生成的状态。

当处理拓扑图中的每个算子接受到barrier时,会进行快照生成,然后上报JobManager。当所有算子都完成快照生成并上报后,就标志着本次快照生成完成。
在这里插入图片描述

4.Aligned Checkpointing

如果算子有多个输入,那么每个输入的barrier不可能同时抵达算子,这种情况需要等待所有输入的barrier都到达才能进行快照生成,也就是需要barrier对齐
在这里插入图片描述

  1. 当算子从一个输入收到barrier时,就会停止处理这个输入的后续数据,将后续数据缓存input buffer中。
  2. 当算子收到了最慢输入流中的barrier时,就会向下游广播此barrier。
  3. 算子执行快照生成操作,然后重新开始处理buffer缓存起来的输入流,然后再从输入管道获取数据处理。

5.Unaligned Checkpointing

如果作业中有的数据流处理较慢,会导致barrier传递变慢一次完整的快照生成会耗时很多。而且会导致反压,因此引入了非对齐barrier。
在这里插入图片描述

  1. 当算子其中一个输入流中的barrier到达输入缓冲区时,算子停止进行逻辑处理。
  2. 算子会立刻讲这个barrier输出输出缓冲区前端
  3. 算子将当前算子状态、当前输出缓冲区和所有输入流barrier还没有被算子处理的数据进行快照。由于其他输入流的barrier还没有到达算子,所以快照处理是异步处理。

当非对齐barrier恢复现场时需要将算子状态、输出缓存、未对齐的输入流都需要恢复到对应链路中,这三者合并起来就能保证当时状态的还原

总结

主要介绍flink算子分为有状态算子和无状态算子。状态存储实现一个直接保存对象,另一个保存字节数组。容错机制主要是将实时流逻辑分段进行所有算子的状态保存和恢复来实现容错。


参考链接

1.Flink State Backend
2.Flink Stateful Flow

原文地址:https://blog.csdn.net/wlphlj/article/details/134728812

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_41864.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注