环境准备
组件名 | 版本 |
---|---|
flink客户端 | 1.14.4-2.12 |
hadoop集群 | 3.1.4 |
hive客户端 | 3.1.2 |
iceberg | iceberg–flink-runtime-1.14-0.13.2.jar |
iceberg–hive依赖 | iceberg-hive-runtime-0.13.2.jar |
sqlclient启动前准备
sqlclient启动有两种方式,per-job、session。
session模式需先启动一个session,启动方式如下:
/home/hadoop/flink/bin/yarn-session.sh
-t /home/hadoop/flink/sqlplugins
-s 2 -jm 5120 -tm 5120 -qu default -nm iceberg_test1 -d
per-job模式需在flink客户端的flink–conf.yaml文件中添加如下参数:
execution.target: yarn-per-job
注意:
flink-conf.yaml文件中还设置了其他内容如下
classloader.resolve-order: parent-first
classloader.check-leaked-classloader: false
#kerberos相关配置
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /bigdata/apps/test/core.keytab
security.kerberos.login.principal: hadoop
security.kerberos.login.contexts: Client
启动sqlclient
-- yarn session模式
/home/hadoop/flink/bin/sql-client.sh embedded
-s appId
-l /home/hadoop/flink/sqlplugins
-i /home/hadoop/flink/script/init.sql
-f /home/hadoop/flink/script/insert.sql
shell
-- yarn per-job模式
/home/hadoop/flink/bin/sql-client.sh embedded
-l /home/hadoop/flink/sqlplugins
-i /home/hadoop/flink/script/init.sql
-f /home/hadoop/flink/script/insert.sql
shell
init.sql
set 'sql-client.verbose'='true';
SET 'execution.checkpointing.interval' = '60s';
CREATE CATALOG ice_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = 'thrift://hdp02.bonc.com:9083',
'warehouse' = 'hdfs://beh001/tmp/',
'hive-conf-dir' = '/home/hadoop/flink/confdir',
'hadoop-conf-dir' = '/home/hadoop/flink/confdir'
);
CREATE DATABASE IF NOT EXISTS ice_catalog.ice_db;
CREATE TABLE IF NOT EXISTS ice_catalog.ice_db.ice_tb (
deal_date string,
chnl_id string,
chnl_name string,
region_code string,
city_code string,
chnl_third_class string,
chnl_second_class string,
chnl_first_class string,
chnl_area_class string,
chnl_eff_flag string,
oper_id string,
oper_name string,
self_term_code string,
air_term_code string,
oper_eff_flag string,
item_cls_type string,
item_cls_desc string,
item_grp_type string,
item_grp_desc string,
user_chnl_id string,
user_chnl_name string,
user_region_code string,
user_city_code string,
item_value1 decimal(14,2),
item_value2 decimal(14,2),
PRIMARY KEY (chnl_id ,oper_id) NOT ENFORCED
) WITH (
'write.upsert.enabled' = 'true',
'write.metadata.previous-versions-max' = '10',
'write.metadata.delete-after-commit.enabled' = 'true',
'commit.manifest.min-count-to-merge' = '1',
'engine.hive.enabled' = 'true',
'table.dynamic-table-options.enabled' = 'true',
'format-version' = '2'
);
CREATE TABLE csvSource (
deal_date string COMMENT '处理日期',
chnl_id string COMMENT '渠道ID',
chnl_name string COMMENT '渠道名称',
region_code string COMMENT '归属地市代码',
city_code string COMMENT '归属区县代码',
chnl_third_class string COMMENT '渠道三级类型',
chnl_second_class string COMMENT '渠道二级类型',
chnl_first_class string COMMENT '渠道一级类型',
chnl_area_class string COMMENT '渠道地域属性',
chnl_eff_flag string COMMENT '渠道有效标志',
oper_id string COMMENT '工号ID',
oper_name string COMMENT '工号姓名',
self_term_code string COMMENT '自助终端标志',
air_term_code string COMMENT '空中充值标志',
oper_eff_flag string COMMENT '工号有效标志',
item_cls_type string COMMENT '指标大类代码',
item_cls_desc string COMMENT '指标大类名称',
item_grp_type string COMMENT '指标细项代码',
item_grp_desc string COMMENT '指标细项名称',
user_chnl_id string COMMENT '用户渠道ID',
user_chnl_name string COMMENT '用户渠道名称',
user_region_code string COMMENT '用户归属地市代码',
user_city_code string COMMENT '用户归属区县代码',
item_value1 decimal(14,2) COMMENT '指标值1',
item_value2 decimal(14,2) COMMENT '指标值2'
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://beh001/tmp/originData/csvSource.txt',
'format' = 'csv',
'csv.field-delimiter' = ','
);
insert.sql
insert into
ice_catalog.ice_db.ice_tb
select
deal_date ,
chnl_id ,
chnl_name ,
region_code ,
city_code ,
chnl_third_class ,
chnl_second_class ,
chnl_first_class ,
chnl_area_class ,
chnl_eff_flag ,
oper_id ,
oper_name ,
self_term_code ,
air_term_code ,
oper_eff_flag ,
item_cls_type ,
item_cls_desc ,
item_grp_type ,
item_grp_desc ,
user_chnl_id ,
user_chnl_name ,
user_region_code ,
user_city_code ,
item_value1,
item_value2
from
csvSource;
原文地址:https://blog.csdn.net/sxau_zhangtao/article/details/134547500
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_2357.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。