1. Time时间机制

时间概念

事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

Flink 可以根据不同的时间概念处理数据。

2. Window(窗口计算

收集窗口时间内的数据,对窗口收集数据进行聚合运算这就是窗口机制

窗口生命周期

创建属于该窗口的第一个元素到达时就会创建该窗口,窗口事先定义好就是固定的,但是窗口创建时间不固定【窗口开始时间以水印所携带的时间戳作为标准

销毁:窗口结束时间之后,就会销毁当前窗口

Flink窗口分类以及窗口 API

Watermark处理乱序数据

3. State状态机制)

什么是Flink状态

状态其实是个变量这个变量保存数据流历史数据, 如果有新的数据流进来,会读取状态变量,将新的数据和历史一起计算

状态分类

托管状态(Managed State)和原始状态(Raw State

托管状态就是由 Flink 统一管理的,状态存储访问故障恢复和重组等一系列问题都由Flink实现,直接使用API

原始状态则是自定义的,相当于就是开辟了一块内存需要自己管理实现状态的序列化故障恢复

通常采用 Flink 托管状态来实现需求

算子状态(Operator State)和按键分区状态(Keyed State)

可以托管状态分为两类算子状态和按键分区状态。

keyBy 将DataStream转换为KeyedStream,KeyedStream是特殊的DataStream。

KeyedState只能应用于KeyedStream,因此KeyedState的计算只能放在KeyBy之后

基于状态(KeyedState)计算实现词频统计

代码实现

事先定义一个实体类

public class WordCount {

    private String word;

    private Integer count;
    // setter&getter&toString方法
}  

Flink程序基本流程

/**
 * description: 基于状态(KeyedState)计算实现词频统计
 */
public class WordCountWithStateful {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> lines =
                env.socketTextStream("127.0.0.1",8888,"n");

        lines.flatMap((String input, Collector<WordCount> output)-> {
                            String[] words = input.split(" ");
                            for(String word:words) {
                                output.collect(new WordCount(word,1));
                            }
                        }
                ).returns(WordCount.class)
            	// keyBy之后,每个key都有对应的状态,同一个key只能操作自己对应的状态
                .keyBy(WordCount::getWord)
                // 状态计算
                .flatMap(new WordCountStateFunc())
                .print();
        
        env.execute();

    }
}

计算函数

public class WordCountStateFunc extends RichFlatMapFunction<WordCount, WordCount> {

    /**
     * 状态变量
     */
    private ValueState<WordCount> keyedState;

    /**
     * description: open方法中状态变量的初始化
     */
    @Override
    public void open(Configuration parameters) throws Exception {

        ValueStateDescriptor<WordCount> valueStateDescriptor =
                // valueState描述
                new ValueStateDescriptor<>(
                        // 描述器的名称
                        "wordcountState",

                        /* 
                         * 描述器的数据类型:
                         *
                         * Flink有自己一套数据类型包含了JAVA和Scala的所有数据类型
                         * 这些数据类型都是TypeInformation对象子类。
                         * TypeInformation对象统一了所有数据类型序列化实现
                         */
                        TypeInformation.of(WordCount.class)
                );

        keyedState = getRuntimeContext().getState(valueStateDescriptor);
    }


    /**
     * description: keyedState计算逻辑
     */
    @Override
    public void flatMap(WordCount input, Collector<WordCount> output) throws Exception {

        // 读取状态
        WordCount lastKeyedState = keyedState.value();

        // 更新状态
        if (lastKeyedState == null) {
            // 状态还未赋值的情况 更新状态
            keyedState.update(input);
            // 返回原数据
            output.collect(input);

        } else {
            // 状态存在旧的状态数据的情况
            Integer count = lastKeyedState.getCount() + input.getCount();
            WordCount newWordCount = new WordCount(input.getWord(), count);

            // 更新状态
            keyedState.update(newWordCount);
            // 返回新的数据
            output.collect(newWordCount);
        }

    }

}

keyedState状态计算步骤

  1. 继承Rich函数
  2. 重写Open方法,对状态变量进行初始化
  3. 状态计算逻辑

为什么要进行有状态的计算 ?

如果Flink发生了异常退出checkpoint机制可以读取保存的状态,进行恢复

广播流、广播状态

有时希望算子并行任务都保持同一份“全局”状态,用来统一配置规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊算子状态,就叫作广播状态(BroadcastState)。【可以动态修改配置

编码步骤

  1. 构建事件
  2. 构建广播
  3. 事件流和广播连接
  4. 连接后的流进行处理

状态后端

Flink中,状态的存储访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式位置

  1. Memory State Backend 【java内存HashMap
  2. FS State Backend 【HDFS】
  3. RocksDB State Backend 【可持久化的key value存储引擎

选择正确的状态后端

HashMapStateBackend 是内存计算读写速度非常快;但是,状态的大小集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源

RocksDB 是硬盘存储可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写需要序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级

空间和时间的抉择

4. Checkpoint(容错机制

什么是Checkpoint(检查点)

Checkpoint能生成快照(Snapshot)
若Flink程序崩溃,重新运行程序时可以有选择从这些快照进行恢复
Checkpoint是Flink可靠性的基石

Checkpoint和State的区别

State指某个算子的数据状态(中间状态),Checkpoint所有算子的数据状态(全局快照
State保存在堆内存,Checkpoint持久保存

Checkpoint分布式快照流程(重点)

水用挡板挡停让水静止,进行快照存储;Checkpoint机制也是如此,Checkpoint Barrier类似挡板

步骤

Source子任务收到了Checkpoint请求,该算子会对自己的数据状态保存快照自己的下一个算子发送Checkpoint Barrier
下一个算子只有收到上一个算子广播过来的Checkpoint Barrier,才进行快照保存

步骤

Sink算子已经收到了所有上游的Checkpoint Barrier时,进行以下2步操作
1.保存自己的数据状态,2.并直接通知检查点协调器
检查点协调器在收集所有的task通知,就认为这次的Checkpoint全局完成了,从而进行持久操作

Checkpoint如何保证数据的一致性(重点)

至少一次(at-leastonce)

发生故障,可能会有重复数据

精确一次(exactly-once)

发生故障,能保证不丢失数据,也没有重复数据

读取最近一次存放快照,数据重放重新计算,Checkpoint机制保证exactly-once

Checkpoint Barrier对齐机制

Barrie对齐机制保证了Checkpoint数据状态的精确一致

下游算子上面对应多个上游算子,下游算子必须要等到上游算子所有的Checkpoint Barrier到齐之后,下游算子才会进行快照的输入。(会把先到的Checkpoint Barrier数据先缓存起来,直到所有的Checkpoint Barrier全部到达,该算子才会进行快照操作

什么savepoint(保存点)

基于checkpoint机制的快照

Checkpoint和Savepoint区别
Checkpoint是自动容错恢复机制,Savepoint某个时间点的全局状态镜像
Checkpoint是Flink系统行为,Savepoint是用户触发
Checkpoint默认程序删除,Savepoint会一直保存

原文地址:https://blog.csdn.net/qq_43417581/article/details/134758426

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_30468.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注