本文介绍: flink 1.18 sql demo更换flink-table-planner 为 flink-table-planner-loader pom.xml <dependencies> <!– https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber –> <dependency> <groupId&
flink 1.18 sql demo
更换flink-table-planner 为 flink-table-planner-loader pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>1.18.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.18.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.18.0</version>
</dependency>
<!-- 官网给的是flink-connector-kafka 但是flink on k8s 会缺包然后有个sql-connector jar 引入后正常 两个保留一个即可 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.0.2-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.2-1.18</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner_2.12</artifactId>-->
<!-- <version>1.18.0</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId> </artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job 这里是你的主类地址-->
<mainClass>com.cn.App</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
demo
package com.cn;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Classname app
* @Description TODO
* @Date 2024/1/12 11:26
* @Created by typezhou
*/
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String str = "CREATE TABLE KafkaTable (n" +
" `user_id` STRING,n" +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp'n" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'aaaa',n" +
" 'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',n" +
" 'properties.group.id' = 'testGrou1p',n" +
" 'scan.startup.mode' = 'latest-offset',n" +
" 'format' = 'csv'n" +
")";
tableEnv.executeSql(str);
Table tableResult = tableEnv.sqlQuery("SELECT user_id FROM KafkaTable group by user_id");
// DataStream<ResultBean> tuple2DataStream = tableEnv.toDataStream(result, ResultBean.class);
// SingleOutputStreamOperator<ResultBean> map = tuple2DataStream.map(new MapFunction<ResultBean, ResultBean>() {
// @Override
// public ResultBean map(ResultBean s) throws Exception {
// Thread.sleep(3000L);
// return s;
// }
// });
// tuple2DataStream.print();
String sqlPri = "CREATE TABLE print_table (n" +
" `user_id` STRING n" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'bbbb',n" +
" 'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',n" +
" 'format' = 'csv'n" +
")";
tableEnv.executeSql(sqlPri);
tableEnv.executeSql("insert into print_table SELECT user_id FROM KafkaTable");
}
}
原文地址:https://blog.csdn.net/newbrid007/article/details/135605378
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_58710.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。