本文介绍: 其他类型的列可以在物理列之间声明,但不会影响最终的物理模式。列的数据类型是从给定表达式自动派生的,不必手动声明。但是,在表的模式中声明数据列是可选的。成为表模式的一部分,可以像常规列一样进行转换存储,为方便起见,如果列名应用作标识元数据键,则可以省略。主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式主键都不可以重复定义,否则。创建的目标表是非原子性的,如果在向表中插入数据时发生错误,该表不会被自动删除每个数据字段都由基于字符串的键标识,并具有文档化的数据类型

目录

执行 CREATE 语句

Python脚本

Java代码

SQL语句

列定义

物理/常规列

元数据列

计算列

WATERMARK

PRIMARY KEY

PARTITIONED BY

AS select_statement


Flink SQL是为了简化计算模型、降低您使用Flink门槛而设计一套符合标准SQL语义的开发语言

执行 CREATE 语句

Python脚本
table_env = TableEnvironment.create(...)

# 对已经注册的表进行 SQL 查询

# 注册名为 “Orders” 的表

table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");

# 在表上执行 SQL 查询,并把得到的结果作为一个新的表

result = table_env.sql_query(

  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

# 对已注册的表进行 INSERT 操作

# 注册 TableSink

table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")

# 在表上执行 INSERT 语句并向 TableSink 发出结果

table_env 

    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Java代码
Environmentsettings settings = Environmentsettings.newInstance()...

TableEnvironment tableEnv = TableEnvironment.create(settings);



// 对已注册的表进行 SQL 查询

// 注册名为 “Orders” 的表

tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");

// 在表上执行 SQL 查询,并把得到的结果作为一个新的表

Table result = tableEnv.sqlQuery(

  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");



// 对已注册的表进行 INSERT 操作

// 注册 TableSink

tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");

// 在表上执行 INSERT 语句并向 TableSink 发出结果

tableEnv.executeSql(

  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

SQL语句
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);

[INFO] Table has been created.



Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);

[INFO] Table has been created.



Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';

[INFO] Submitting SQL update statement to the cluster...

定义

物理/常规列

物理列是数据库中已知的常规列。它们定义物理数据字段名称类型顺序。因此,物理列表示从外部系统读取和写入的有效数据。其他类型的列可以在物理列之间声明,但不会影响最终的物理模式。

以下语句创建一个仅包含常规列的表:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING

) WITH (

 

);

元数据列

元数据列是SQL标准的扩展,元数据列由metadata关键字指示。例如,可以使用元数据列从Kafka记录读取时间戳,并将时间写入Kafka,以进行基于时间操作连接器和格式文档列出了每个组件的可用元数据字段。但是,在表的模式中声明元数据列是可选的。

以下语句创建一个表,其中包含引用元数据字段时间戳的附加元数据列:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `record_time` TIMESTAMP_LTZ(3) METADATA FROM timestamp    reads and writes a Kafka record’s timestamp

) WITH (

  connector = kafka

 

);

每个元数据字段都由基于字符串的键标识,并具有文档化的数据类型。例如,Kafka连接器公开了一个元数据字段,该字段具有键时间戳和数据类型timestamp_LTZ3),可用于读取和写入记录。

在上面的示例中,元数据列record_time成为表模式的一部分,可以像常规列一样进行转换存储,为方便起见,如果列名应用作标识元数据键,则可以省略FROM子句

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `timestamp` TIMESTAMP_LTZ(3) METADATA    — use column name as metadata key

) WITH (

  connector = kafka

 

);

计算

计算列是使用语法column_name AS Computed_column_expression生成虚拟列。

计算计算可以引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不物理存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。

例如,计算列可以定义

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `price` DOUBLE,

  `quantity` DOUBLE,

  `cost` AS price * quanitity,  — evaluate expression and supply the result to queries

) WITH (

  connector = kafka

 

);

Flink中通常使用计算列来定义CREATETABLE语句中的时间属性

使用系统PROCTIME()函数,可以通过proc AS PROCIME()轻松定义processing time属性

Event time属性时间戳可以在WATERMARK声明之前预处理。例如,如果原始字段不是TIMESTAMP3类型嵌套JSON字符串中,则可以使用计算列。

WATERMARK

WATERMARK 定义了表的Event time属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记event time属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark 表达式返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间 返回 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置间隔发出。 watermark 间隔 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

使用事件时间语义时,表必须包含事件时间属性 watermark 策略。

Flink 提供了几种常用的 watermark 策略。

发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

发出到目前为止已观察到的最大时间戳减 1 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。

  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

发出到目前为止已观察到的最大时间戳减去指定延迟 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 延迟 watermark 策略。

CREATE TABLE Orders (

    `user` BIGINT,

    product STRING,

    order_time TIMESTAMP(3),

    WATERMARK FOR order_time AS order_time INTERVAL ‘5’ SECOND

) WITH (……);

PRIMARY KEY

主键用作 Flink 优化的一种提示信息主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式主键都不可以重复定义,否则 Flink 报错

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 存储数据因此只支持 NOT ENFORCED 模式,即不做检查用户需要自己保证唯一性。

Flink 假设声明了主键的列都是不包含 Null 值的,Connector 处理数据时需要自己保证语义正确

Notes:  CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录

AS select_statement

表也可以通过一个 CTAS 语句中的查询结果来创建并填充数据,CTAS 是一种简单、快捷的创建表并插入数据的方法

CTAS 有两个部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查询 CREATE 部分从 SELECT 查询获取信息,并创建目标表。  CREATE TABLE 类似,CTAS 要求必须在目标表的 WITH 子句中指定必填的表属性。

CTAS 的建表操作需要依赖目标 Catalog。比如,Hive Catalog 自动 Hive 中创建物理表。但是基于内存 Catalog 只会将表的元信息注册在执行 SQL Client 内存中。

CREATE TABLE my_ctas_table

WITH (

    connector = kafka,

   

)

AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意 CTAS 有如下约束

  • 暂不支持创建临时表。
  • 暂不支持指定列信息。
  • 暂不支持指定 Watermark。
  • 暂不支持创建分区表。
  • 暂不支持主键约束

注意 目前,CTAS 创建的目标表是非原子性的,如果在向表中插入数据时发生错误,该表不会被自动删除

原文地址:https://blog.csdn.net/victory0508/article/details/134651604

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_4663.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注