本文介绍: shell 创建生产者对象:kafka–console–producer.sh —bootstrap–server hadoop102:9092 —topic first。一个主题的一个分区只能被一个消费者组中的一个消费者消费。一个消费者组中的一个消费者可以消费一个主题中的多个分区。主要用于生成模拟数据,也需要导入相关依赖。消费者对象:KafkaConsumenr。使用文件数据源前,需要先添加相关依赖。
DataStream API
文件数据源
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
public class Flink02_FileSource {
public static void main(String[] args) throw Exception {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
//file source
FileSource.FileSourceBuilder<String> fileSourceBuilder = FileSource
.<String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));
FileSource<String> fileSource = fileSourceBuilder.build();
//source 算子
DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
ds.print();
env.execute();
}
}
DataGen数据源
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
public class Flink04_DataGenSource {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return UUID.randomUUID() + "->" + value;
}
},
100,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
DataStreamSource<String> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");
dataGenDs.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Kafka消费者
-
消费原则:
一个主题的一个分区只能被一个消费者组中的一个消费者消费
一个消费者组中的一个消费者可以消费一个主题中的多个分区 -
消费数据存在的问题
-
shell 创建生产者对象:kafka–console–producer.sh —bootstrap–server hadoop102:9092 —topic first
public class Flink03_KafkaSource {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092")
.setGroupId("flink")
.setTopics("first")
//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
//如果还有别的配置需要指定,统一使用通用方法
// .setProperty("isolation.level", "read_committed")
.build();
DataStreamSource<String> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");
kafkaDS.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
原文地址:https://blog.csdn.net/qq_44273739/article/details/134814641
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_49390.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。