一、状态概念
在流处理中有很多操作只依赖独立的事件即可,有的操作不仅需要依赖当前事件还要依赖前序事件。这样flink的算子就分为无状态算子和有状态算子。
- 无状态算子:算子对事件处理是独立的,每个事件是没有任何关联的,每个事件可以被单独处理和操作。例如:map、fliter等操作
- 有状态算子:算子对事件处理还依赖于之前事件的处理结果。因此还需要维护和保存算子的中间处理结果。例如聚合、去重等操作。
flink对算子的状态保存不仅可以实现更多高级特性,还可以通过状态的保存和恢复来实现容错。
二、状态实现
1.状态触发
- 窗口函数:在windows触发之前需要收集或者聚合事件,此时需要保存事件或者中间结果。
- 转化函数:会使用key/value格式的状态接口来存储状态。
- 转化函数:实现CheckpointedFunction接口,接受到barrier时触发checkpoint机制。
2.状态存储实现
Flink内置了两个状态存储实现HashMapStateBackend和EmbeddedRocksDBStateBackend,默认使用HashMapStateBackend。
2.1 HashMapStateBackend
状态数据以Java对象的形式存储在JVM堆中,状态快照可以持久化到文件系统或者Job Manager的堆中。
2.2 EmbeddedRocksDBStateBackend
状态数据被序列化成字节数组的方式存储在RocksDB数据库中,数据库默认将数据存储在TaskManager的数据目录。采用异步方式生成状态快照。
2.3 状态存储对比
flink程序选择状态存储实现就是在性能与可拓展性之间的权衡。
HashMapStateBackend由于状态的读写都是在java的heap上操作因此速度快,但状态大小受限于集群中可用内存。
EmbeddedRocksDBStateBackend可以根据可用disk进行空间拓展,并且支持增量状态快照。但由于读写都需要反序列化因此比HashMapStateBackend慢一个数量级。
3.设置状态存储实现
3.1 单个作业设置
StreamExecutionEnvironment 可以对每个 Job 的 State Backend 进行设置,如下所示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
3.2 全局设置
在 flink–conf.yaml 可以通过键 state.backend.type 设置默认的 State Backend。
# 用于存储 operator state 快照的 State Backend
state.backend: hashmap
# 存储快照的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
三、容错机制
flink容错机制就是针对算子和数据流进行持久化快照。当程序发生故障时,flink可以根据快照将应用程序的状态完整的恢复并处理。
1.状态快照
快照时flink作业状态一致性景象。快照包括每个数据源的指针以及作业每个状态运算符的状态数据,状态数据是指处理了数据源偏移位置之后的所有事件后生成的状态。
快照分为CheckPoint和SavePoint两种:
2.状态快照生成
最简单的想法就是 同一个时间点 所有节点停止处理 然后将状态和上游通道中的事件保存下来。但是由于是分布式节点协调同一个时间点比较难,完全停止也影响实时性。
根据批处理经验,可以将实时流处理一段流之后,停止读取,等待所有节点处理完毕之后,这样只需要保存实时流的位置和所有节点的状态即可。这样如果想要重放只需要将所有节点状态恢复,并按照原有位置读取实时流即可。
按照上边思路,上游节点保存完状态后需要等待最后一个节点处理完成才能处理新数据。如果处理流程越长,保存快照的时间就会越长,最后导致计算资源利用率低。其实仔细思考一下如果当前节点保存完快照,完全可以继续处理接下来的事件,因为并不会影响本节点已经生成的状态快照。
Flink采用的就是类似的想法进行快照的处理,首先会在事件流中插入barrier将实时流逻辑上分割成多个有限流。当算子接受到barrier时需要执行快照生成,然后将barrier广播给下游算子。当需要恢复时,重新部署服务,从快照中把每个算子的状态恢复,然后从barrier处重新读取源数据即可。
3.Checkpoint Barrier
JobManager会告知TaskManager需要进行状态快照生成时,TaskManager会记录Sources的偏移量,并且将Checkpoint barrier插入到事件流中。
其中状态快照n将包含偏移量以及每个算子的状态,状态是指此算子消费了barrier n之前所有事件、但是不包含barrier n之后的任何事件生成的状态。
当处理拓扑图中的每个算子接受到barrier时,会进行快照生成,然后上报JobManager。当所有算子都完成快照生成并上报后,就标志着本次快照生成完成。
4.Aligned Checkpointing
如果算子有多个输入,那么每个输入的barrier不可能同时抵达算子,这种情况需要等待所有输入的barrier都到达才能进行快照生成,也就是需要barrier对齐。
- 当算子从一个输入中收到barrier时,就会停止处理这个输入的后续数据,将后续数据缓存到input buffer中。
- 当算子收到了最慢输入流中的barrier时,就会向下游广播此barrier。
- 算子执行快照生成操作,然后重新开始处理buffer中缓存起来的输入流,然后再从输入管道中获取数据处理。
5.Unaligned Checkpointing
如果作业中有的数据流处理较慢,会导致barrier传递变慢,一次完整的快照生成会耗时很多。而且会导致反压,因此引入了非对齐barrier。
- 当算子其中一个输入流中的barrier到达输入缓冲区时,算子停止进行逻辑处理。
- 算子会立刻讲这个barrier输出到输出缓冲区最前端。
- 算子将当前算子状态、当前输出缓冲区和所有输入流barrier还没有被算子处理的数据进行快照。由于其他输入流的barrier还没有到达算子,所以快照处理是异步处理。
当非对齐barrier恢复现场时需要将算子状态、输出缓存、未对齐的输入流都需要恢复到对应链路中,这三者合并起来就能保证当时状态的还原。
总结
主要介绍了flink算子分为有状态算子和无状态算子。状态存储实现一个是直接保存对象,另一个保存字节数组。容错机制主要是将实时流逻辑分段进行所有算子的状态保存和恢复来实现容错。
参考链接
1.Flink State Backend
2.Flink Stateful Flow
原文地址:https://blog.csdn.net/wlphlj/article/details/134728812
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_41864.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!