本文介绍: 一步一个脚印,一天一道大数据面试题。Flink 是大数据实时处理计算框架。实时框架对检查点,错误恢复的功能要比离线的更复杂,所以一起来了解 Flink 的 Checkpoint 机制吧。
一步一个脚印,一天一道大数据面试题。
Flink 是大数据实时处理计算框架。实时框架对检查点,错误恢复的功能要比离线的更复杂,所以一起来了解 Flink 的 Checkpoint 机制吧。
Checkpoint 机制
触发 Checkpoint
通过设置时间或数据量阈值来触发 Checkpoint
生成 Barrier 屏障,写入快照
Flink 触发 Checkpoint 后,会从数据源 Source 算子开始分发 Barrier,算子收到后便开始停止处理数据,将目前的状态写入快照。
分发 Barrier 至下游
分发 Barrier 到下游算子,各个算子生成快照。直至所有算子完成写入 Checkpoint,Checkpoint 写入完成。
检查点恢复
恢复时,分为两部分。
1.从 Checkpoint 恢复数据,这部分数据是 Barrier 之前的数据和操作。
2.继续处理 Barrier 之后的数据。
代码演示
下面是一个 Java 版 Flink 算子 demo
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCheckpointExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从命令行参数获取输入路径和输出路径
ParameterTool params = ParameterTool.fromArgs(args);
String inputPath = params.get("input");
String outputPath = params.get("output");
// 开启 Checkpoint,并设置 Checkpoint 间隔
env.enableCheckpointing(5 * 1000); // 每 5 秒触发一次 Checkpoint
// 设置 Checkpoint 模式为 EXACTLY_ONCE(精确一次语义)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建数据流
DataStream<String> dataStream = env.readTextFile(inputPath);
// 对数据进行简单处理
DataStream<Integer> resultStream = dataStream.map(value -> Integer.parseInt(value) * 2);
// 输出结果到控制台
resultStream.print();
// 将结果写入文件
resultStream.writeAsText(outputPath);
// 执行任务
env.execute("Flink Checkpoint Example");
}
}
原文地址:https://blog.csdn.net/Jiweilai1/article/details/136015968
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_66141.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。