1. 准备工作

什么是Kafka源表

Kafka分布式、高吞吐、可扩展消息队列服务,广泛用于日志收集监控数据聚合流式数据处理在线离线分析大数据领域。

docker部署zookeeper
docker pull wurstmeister/zookeeper

docker run -d --restart=always 
--log-driver json-file 
--log-opt max-size=100m 
--log-opt max-file=2  
--name zookeeper 
-p 2181:2181 
-v /etc/localtime:/etc/localtime wurstmeister/zookeeper
Docker部署kafka
docker pull wurstmeister/kafka:2.12-2.5.0

docker run -d --restart=always 
--log-driver json-file 
--log-opt max-size=100m 
--log-opt max-file=2 
--name kafka 
-p 9092:9092 
-e KAFKA_BROKER_ID=0 
-e KAFKA_ZOOKEEPER_CONNECT=192.168.0.6/kafka 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.6:9092 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 
-v /etc/localtime:/etc/localtime wurstmeister/kafka:2.12-2.5.0

说明

-e KAFKA_BROKER_ID=0  kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=192.168.244.132:2181/kafka 配置zookeeper管理kafka路径172.16.0.13:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.244.132:9092  把kafka的地址端口注册zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

v /etc/localtime:/etc/localtime 容器时间同步虚拟机时间

生成数据

$ docker execit kafka bash

进入 /opt/kafka_2.12-2.5.0/bin/目录

$ cd /opt/kafka_2.12-2.5.0/bin/

运行kafka生产者发送消息

$ ./kafka-consoleproducer.shbrokerlist localhost:9092 —topic flink_test

> {“ts“: “2020-10-09 12:23:34″,”oid“: 7,”price_amt”:20}

> {“ts“: “2020-10-19 10:13:25″,”oid“: 8,”price_amt”:20}

> {“ts“: “2020-12-19 14:33:35″,”oid“: 9,”price_amt”:20}

> {“ts“: “2020-12-29 21:56:01”,”oid“: 10,”price_amt”:20}

> {“ts“: “2020-12-29 21:56:01”,”oid“: 11,”price_amt”:30}

> {“ts“: “2020-12-40 23:08:15”,”oid”: 12,”price_amt”:100}

2. 构建kafka数据源

DROP TABLE IF EXISTS case_kafka_mysql;

create table case_kafka_mysql (
  id BIGINT,
  ts VARCHAR,
  price_amt BIGINT,
  proctime AS PROCTIME ()
)
 with (
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '127.0.0.1:9092',
  'properties.group.id' = 'flink_gp_test1',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
 );

在实际使用中请根据实际情况配置字段名WITH参数

相关With连接选项

Option

Required

Comment

Description

connector

Y

源表类型

kafka

topic

Y

topic名称

topic

topic-pattern

N

匹配读取topic名称正则表达式

topictopic-pattern两个选项只能指定其中一个

properties.bootstrap.servers

Y

Kafka Broker地址

kafka连接地址

properties.group.id

Y

Kafka消费组ID

groupid

format

Y

Flink Kafka Connector在反序列化来自Kafka消息value使用格式

csv, json, avro, debezium-json, canaljson

scan.startup.mode

N

Kafka读取数据启动位点

取值如下

earliest-offset:从Kafka最早分区开始读取。

latestoffset:从Kafka最新位点开始读取。

groupoffsets默认值:根据Group读取。如果指定group为首次使用,则必须将properties.auto.offset.reset设置 earliestlatest指定首次启动位置

timestamp:从Kafka定时间点读取。

需要WITH数中指定scan.startup.timestamp-millis参数。

specificoffsets:从Kafka指定分区指定偏移量读取。需要WITH数中指定scan.startup.specificoffsets参数。

scan.startup.specificoffsets

N

specificoffsets启动模式下,指定每个分区启动偏移量

设置消费模式为:specificoffsets时,需写出消费offset,例:partition:0,offset:42;partition:1,offset:300′

scan.startup.timestamp-millis

N

timestamp启动模式下,指定启动位点时间戳。

设置消费模式为:timestamp时,指定时间, 单位为毫秒。

sink.partitioner

N

注意:

1. 如果您还需要直接配置Connector使用Kafka Consumer可以Kafka Consumer配置参数前添加properties前缀,并将该Kafka Consumer配置信息追加WITH参数。例如Kafka集群需要SASLSimple Authentication and Security Layer认证

CREATE TABLE kafkaTable (
    ...
) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);

说明 Per-Job集群支持Kafka认证

2.Kafka源表只在checkpoint成功后将当前消费位点提交Kafka集群。如果您的checkpoint间隔设置较长,您在Kafka集群侧观察到的消费位点会有延迟。在进行checkpoint时,Kafka源表会将当前读取进度存储状态中,并不依赖于提交集群上的位点进行故障恢复提交位点仅仅是为了在Kafka侧能够监控到读取进度,位点提交失败不会对数据正确性产生任何影响

网络连接排查

如果Flink作业在启动时出现Timed out waiting for a node assignment错误,一般是FlinkKafka之间的网络连通问题导致的。Kafka客户端服务端建立连接的过程如下

客户端使用您指定的bootstrap.servers地址连接Kafka服务端Kafka服务端根据配置客户端返回集群中各台broker的元信息,包括各台broker的连接地址。

客户端使用一步broker返回的连接地址连接各台broker进行读取或写入。如果Kafka服务端没有正确配置客户端在第一步收到的连接地址有误,即使bootstrap.servers配置的地址可以连接上,也无法正常读取或写入数据。该问题经常在FlinkKafka之间存在代理端口转发、专线等网络转发机制时发生。

3. Kafka连接器

Table APISQL编写Flink程序中,可以创建表的时候WITH子句指定连接器connector),这样就可以连接到外部系统行数交互了。

Table Source负责外部系统读取数据转换成表,Table Sink负责结果写入外部系统

Flink 1.13以及以后版本API调用中,已经不去区分Table SourceTable Sink我们只要建立到外部系统的连接并创建表就可以Flink 自动会从程序处理逻辑解析出它们的用途。

Flink Table APISQL支持了各种不同的连接器。当然,最简单的其实就是连接到控制台打印输出

CREATE TABLE ResultTable (
    user STRING,
    cnt BIGINT
WITH (
    'connector' = 'print'
);

这里需要WITH定义connectorprint可以了。而对于其它的外部系统,则需要增加一些配置项。

Kafka SQL 连接器可以 Kafka 主题topic读取数据转换成表,也可以将表数据写入 Kafka 主题。换句话说,创建表的时候指定连接器 Kafka,则这个表既可以作为输入表,也可以作为输出表。

1. 引入依赖

需要下载对应 jar 包放到 lib 目录下,注意一定是pat jar

Flink 为各种连接器提供了一系列表格table formats),比如 CSVJSON AvroParquet 等等。

这些表格定义底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具

对于 Kafka 而言,CSVJSONAvro 等主要格式都是支持的, 根据 Kafka 连接器中配置的格式,我们可能需要引入对应依赖支持比如flink-csv等。

由于SQL客户端中已经内置 CSVJSON 支持,因此使用时无需专门引入;而对于 没有内置支持的格式(比如 Avro),则仍然要下载相应的 jar 包。

为此,需要配置flink集群环境

配置conf/flink.yaml各项

lib增加jar文件flink-sqlconnector-kafka_2.11-1.14.5.jarkafka-clients-2.5.0.jar

standalone方式启动集群

bin/stopcluster.sh

bin/startcluster.sh

2. 创建连接到 Kafka 的表

创建一个连接到 Kafka 表,需要在 CREATE TABLE DDL 中在 WITH 子句里指定连接 器为 Kafka,并定义必要的配置参数

CREATE TABLE KafkaTable (
`user` STRING,
 `url` STRING,
 `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
 'connector' = 'kafka',
 'topic' = 'events',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv'
)

这里定义 Kafka 连接器对应主题topic),Kafka 服务器消费者 ID消费者起始模式以及表格式。

需要特别说明的是,在KafkaTable字段中有一个ts,它的声明用到 METADATA FROM,这是表示一个数据metadata column),它是由 Kafka 连接器的元数据timestamp生成的。

这里的timestamp其实就是Kafka数据自带时间戳,我们把它直接作为元数据提取出来,转换成一个新的字段ts.

3. Upsert Kafka

正常情况下,Kafka作为保持数据顺序消息队列,读取和写入应该流式的数据,对应表中就是仅追加appendonly)模式。

如果我们想要将有更新操作(比如分组聚合)的结 果表写入 Kafka,就会因为Kafka无法识别撤回(retract)或更新插入upsert消息而导致异常

为了解决这个问题Flink专门增加了一个Upsert Kafka连接器。这个连接器支持以更新插入UPSERT)的方式Kafkatopic读写数据。

具体来说,Upsert Kafka连接器处理的是更新日志changlog)流。如果作为 TableSource,连接器会将读取到的topic中的数据(key, value),解释为对当前key的数据值的更新(UPDATE), 也就是查找动态表中 key 对应的一行数据,将 value 更新为最新的值;因为是 Upsert 操作,所 以如果没有 key 对应的行,那么也会执行插入INSERT操作。另外,如果遇到 value 为空 null),连接器就把这条数理解为对相应 key 一行删除DELETE)操作。

如果作为 TableSinkUpsert Kafka连接器会将有更新操作的结果表,转换成更新日志changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应的是一个添加add消息,那么就直接正常写入 Kafka 主题;如果是删除DELETE)或者 更新前的数据,对应是一个撤回(retract消息,那么就把 value 为空null)的数据写入 Kafka 由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。

下面是一个创建和使用 Upsert Kafka 表的例子

CREATE TABLE pageviews_per_region (
 user_region STRING,
 pv BIGINT,
 uv BIGINT,
 PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'pageviews_per_region',
 'properties.bootstrap.servers' = '...',
 'key.format' = 'avro',
 'value.format' = 'avro'
);

CREATE TABLE pageviews (
 user_id BIGINT,
 page_id BIGINT,
 viewtime TIMESTAMP,
 user_region STRING,
 WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'pageviews',
 'properties.bootstrap.servers' = '...',
 'format' = 'json'
);

-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECT
 user_region,
 COUNT(*),
 COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

这里我们Kafkapageviews读取数据统计每个区域PV(全部浏览量)和UV(对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。

为了将结果表写入Kafkapageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中需要用PRIMARY KEY来指定主键,并且在WITH子句中分别指定keyvalue序列化格式。

4.构建JDBC sink表

mysql构建数据表

JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库取数据并将数据写入数据。介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询

如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

引入依赖

flink-connector-jdbc-1.11-1.14.5.jar

连接到指定的数据库还需要驱动程序依赖项。 以下是当前支持的驱动程序

Driver

Group Id

Artifact Id

MySQL

mysql

mysqlconnector-java

PostgreSQL

org.postgresql

postgresql

Derby

org.apache.derby

derby

JDBC 连接器和驱动程序目前不是 Flink 二进制发行版的一部分

CREATE TABLE sync_test_1 (
  `ts` varchar(64) NOT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`ts`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
构建MySQL结果表
DROP TABLE IF EXISTS sync_test_1;

CREATE TABLE sync_test_1 (
ts string,
total_gmv bigint,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
'table-name' = 'sync_test_1',
'username' = 'root',
'password' = 'Admin'
);

说明

With参数

选项

是否

必须

默认

数据

类型

描述

connector

Y

String

结果表类型固定值为jdbc

url

Y

String

数据库url

tablename

Y

String

JDBC表名

driver

N

String

用于连接到此URL的JDBC驱动程序类名,如果未设置,它将自动从URL派生。

username

N

String

JDBC用户名。如果指定了用户名密码,则必须同时指定它们。

password

N

String

JDBC用户密码

connection.maxretrytimeout

N

60s

Duration

重试之间的最大超时超时应以秒为单位,且不应小于1秒

scan.partition.column

N

String

用于分区列名

scan.partition.num

N

Integer

分区数量

scan.partition.lower-bound

N

Integer

第一个分区的最小值

scan.partition.upper-bound

N

Integer

最后一个分区的最大值

scan.fetch-size

N

0

Integer

每次读取时应从数据库提取行数。如果指定的值为零,则忽略提示

scan.autocommit

N

true

Boolean

在JDBC驱动程序上设置自动提交标志,该标志确定是否事务自动提交每个语句。一些JDBC驱动程序,特别是Postgres,可能需要将其设置为false以便流式传输结果。

lookup.cache.maxrows

N

Integer

查找缓存最大行数,超过此值时,最旧的行将过期默认情况下禁用查找缓存

lookup.cache.ttl

N

Duration

查找缓存每行的最长生存时间,在此时间内,最旧的行将过期默认情况下禁用查找缓存

lookup.max-retries

N

3

Integer

查找数据库失败时的最大重试次数

sink.buffer-flush.maxrows

N

100

Integer

刷新前,缓存记录最大值。可以设置为零以禁用它。

sink.buffer-flush.interval

N

1s

Duration

flush数据的时间间隔数据在Flink缓存的时间超过该参数指定的时间后,异步线程flush数据到数据库中。默认值1秒。可以设置为0禁用它,即不再缓存记录,直接flush数据。

sink.max-retries

N

3

Integer

将记录写入数据库失败时的最大重试次数

sink.parallelism

N

Integer

定义JDBC sink操作的并行性。默认情况下,并行度由框架与上游运算相同的并行度来确定。

技术说明
1. 主键处理

Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式运行,否则,连接器以追加append)模式运行

upsert模式下,Flink会根据主键插入新行或更新现有行,Flink可以通过这种方式保证幂等性。

为保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。

append模式下,Flink会将所有记录解释为INSERT消息,如果底层数据库发生主键或唯一约束违规,INSERT操作可能会失败。

2.分区扫描

为了加速并行 Source 任务实例中的数据读取,Flink 提供了 JDBC 表的分区扫描功能

如果指定了以下所有扫描分区选项,则必须全部指定。

它们描述了从多个任务并行读取时如何对表进行分区。

scan.partition.column 必须是相关表中数字日期或时间戳列。

请注意,scan.partition.lower-bound和scan.partition.upper-bound用于决定分区步长过滤表中的行。

如果是批处理作业,也可以在提交flink作业之前先获取最大值和最小值

scan.partition.column用于输入进行分区的列名

scan.partition.num:分区数。

scan.partition.lower-bound:第一个分区的最小值

scan.partition.upper-bound:最后一个分区的最大值。

3.查找缓存(Lookup Cache )

流式计算中,维表(dim_dic)是一个很常见概念,一般用于sqljoin中,对流式数据进行数补全,或在不复杂模型中对事实表做轻量化维度退化。

比如我们的source stream是来自日志订单数据,但日志中我们只是记录了订单商品的id,却没有其他的附加信息(如sku、促销活动、优惠券信息等),但我们把订单数据存入实时数仓进行数据分析时候,却需要同步获取sku名称、优惠券等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数补全

维表一般存储在外部存储中,如mysqlhbase(使用phoenix操作)、redis等等。

JDBC 连接器可以在时间连接中用作查找源(又名维度表)。目前,仅支持同步查找模式。

默认情况下,查找缓存未启用。您可以通过设置如下2个属性启用它。

lookup.cache.maxrows

lookup.cache.ttl

查找缓存用于提高临时连接 JDBC 连接器的性能

默认情况下,查找缓存未启用,因此所有请求发送到外部数据库。

启用查找缓存后,每个进程(即 TaskManager)都会持有一个缓存。

Flink 会先查找缓存,只有在缓存缺失时才会向外部数据库发送请求,并根据返回的行更新缓存。

当缓存达到最大缓存行lookup.cache.max-rows 或行超过最大存活时间lookup.cache.ttl 时,缓存中最旧的行将过期

缓存的行可能不是最新的,用户可以将 lookup.cache.ttl 调整为较小的值以获得更快的数据更新,但这可能会增加发送到数据库的请求数量。

所以这是吞吐量正确性之间的平衡。

4.幂等写入(Idempotent Writes)

如果在DDL中定义了主键,则JDBC接收器将使用upsert语义而不是普通的INSERT语句

Upsert语义是指如果底层数据库中存在唯一约束违规,则原子添加新行或更新现有行,这提供了幂等性。

如果出现故障,Flink 作业将从上一个成功的检查恢复并重新处理,这可能导致恢复期间重新处理消息。

强烈建议使用upsert模式,因为如果需要重新处理记录,它有助于避免违反约束重复数据。

除了故障恢复之外,随着时间的推移,源主题自然也可能包含多个具有相同主键的记录,这使得 upserts 是可取的。

5. 计算

INSERT INTO sync_test_1
SELECT ts,SUM(price_amt) AS total_gmv
FROM case_kafka_mysql
GROUP BY ts;

查看执行结果,验证

SELECT ts, total_gmv FROM sync_test_1;

继续生成数据:

$ ./kafka-consoleproducer.sh –broker-list localhost:9092 –topic flink_test

> {“ts”: “2020-10-09 12:23:34″,”id”: 7,”price_amt”:20}

> {“ts”: “2020-10-19 10:13:25″,”id”: 8,”price_amt”:20}

> {“ts”: “2020-12-19 14:33:35″,”id”: 9,”price_amt”:20}

> {“ts”: “2020-12-29 21:56:01″,”id”: 10,”price_amt”:20}

> {“ts”: “2020-12-29 21:56:01″,”id”: 11,”price_amt”:30}

继续查验结果:

SELECT ts, total_gmv FROM sync_test_1;
Top-N

Top-N查询按列排序的N个最小最大值。最小最大值集都被视为Top-N查询。当需要在某个条件下仅显示批处理/流式处理表中的N个最底层或N个最顶层记录时,Top-N查询非常有用。该结果集可用于一步分析。

Flink使用OVER窗口子句过滤条件组合表示Top-N查询。借助OVER window PARTITION BY子句的强大功能,Flink还支持每组Top-N。例如每个类别实时销售额最高的前五种产品批处理表和流表上的SQL支持Top-N查询

Flink SQL将根据顺序键对输入数据流进行排序,因此如果前N条记录已更改,则更改后的记录将作为收回/更新记录发送到下游。建议使用支持更新的存储作为Top-N查询接收器。此外,如果前N条记录需要存储在外部存储器中,则结果表应具有与top-N查询相同的唯一键。

Top-N查询的唯一键是分区列和rownum列的组合

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY ts ORDER BY price_amt DESC) AS row_num
  FROM case_kafka_mysql)
WHERE row_num <= 5

如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致大量记录写入结果表。例如,当排名9的记录(例如product-1001)被更新并且其排名被升级为1时,排名1~9的所有记录将作为更新消息输出到结果表。如果结果表接收到太多数据,它将成为SQL作业的瓶颈。

优化方法是在Top-N查询的外部SELECT子句省略rownum字段。这是合理的,因为前N条记录的数量通常不多,因此消费者可以自己快速排序记录。在上面的示例中,如果没有rownum字段,只需要将更改的记录(ID)发送到下游,这可以减少结果表的IO。

优化后:

SELECT ts, amount
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY ts ORDER BY price_amt DESC) AS row_num
  FROM case_kafka_mysql)
WHERE row_num <= 5

原文地址:https://blog.csdn.net/victory0508/article/details/134666392

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_25294.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注