本文介绍: WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。json.fail-on–missing–field:在json缺失字段时是否报错。json.ignore–parse–errors:在解析json失败时是否报错。一般无法保证json格式,所以以上两个配置是比较重要的。注意:针对双流中的每条记录都发触发。mysql 建表语句。
1. 准备工作
生成数据
{“ts“: “20201011”,“id“: 8,“price_amt“:211}
{“id“: 8,“coupon_price_amt“:100}
docker exec -it 192d1369463a bash
bash-5.1# cd /opt/kafka_2.12-2.5.0/bin
bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql
>{"ts": "20201011","id": 8,"price_amt":211}
docker exec -it 192d1369463a bash
bash-5.1# cd /opt/kafka_2.12-2.5.0/bin
bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2
>{"id": 8,"coupon_price_amt":100}
创建数据表
CREATE TABLE `sync_test_2` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`ts` varchar(64) DEFAULT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `sync_test_22` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`ts` varchar(64) DEFAULT NULL,
`coupon_ratio` double DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
2. 创建数据表
创建数据源表
create table flink_test_2_1 (
id BIGINT,
ts VARCHAR,
price_amt BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'case_kafka_mysql',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'flink_gp_test2-1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
);
create table flink_test_2_2 (
id BIGINT,
coupon_price_amt BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'flink_test_2',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'flink_gp_test2-2',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
);
json.fail-on-missing–field:在json缺失字段时是否报错
json.ignore–parse–errors:在解析json失败时是否报错
创建数据目标表
CREATE TABLE sync_test_2 (
ts string,
total_gmv bigint,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
'table-name' = 'sync_test_2',
'username' = 'root',
'password' = 'Admin'
);
CREATE TABLE sync_test_22 (
ts string,
coupon_ration bigint,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
'table-name' = 'sync_test_2',
'username' = 'root',
'password' = 'Admin'
);
3. 计算
说明 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。
BEGIN STATEMENT SET; --写入多个Sink时,必填。
INSERT INTO sync_test_2
SELECT
ts,
SUM(price_amt - coupon_price_amt) AS total_gmv
FROM
(
SELECT
a.ts as ts,
a.price_amt as price_amt,
b.coupon_price_amt as coupon_price_amt
FROM
flink_test_2_1 as a
LEFT JOIN flink_test_2_2 b on b.id = a.id
)
GROUP BY ts;
INSERT INTO sync_test_22
SELECT
ts,
sum(coupon_price_amt)/sum(amount) AS coupon_ration
FROM
(
SELECT
a.ts as ts,
a.price_amt as price_amt,
b.coupon_price_amt as coupon_price_amt
FROM
flink_test_2_1 as a
LEFT JOIN flink_test_2_2 b on b.id = a.id
)
GROUP BY ts;;
END; --写入多个Sink时,必填。
WITH子句
WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。
BEGIN STATEMENT SET; --写入多个Sink时,必填。
with orders_with_coupon AS (
SELECT
a.ts as ts,
a.price_amt as price_amt,
b.coupon_price_amt as coupon_price_amt
FROM
flink_test_2_1 as a
LEFT JOIN flink_test_2_2 b on b.id = a.id
)
INSERT INTO sync_test_2
SELECT
ts,
SUM(price_amt - coupon_price_amt) AS total_gmv
FROM orders_with_coupon
GROUP BY ts;
INSERT INTO sync_test_22
SELECT
ts,
coupon_price_amt/price_amt AS coupon_ration
FROM orders_with_coupon
GROUP BY ts;;
END; --写入多个Sink时,必填。
原文地址:https://blog.csdn.net/victory0508/article/details/134809717
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_47854.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。