本文介绍: Redis环境说明 redis6使用docker部署redis6.x 看个人主页docker相关文章编码实战数据源static {list.add(“spring boot2.x课程”);list.add(“微服务SpringCloud课程”);list.add(“RabbitMQ消息队列”);list.add(“Kafka课程”);list.add(“小滴课堂面试专题第一季”);list.add(“Flink流式技术课程”);list.add(“工业级微服务项目大课训练营”);
🚀 作者 :“大数据小禅”
🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X
🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
Flink怎么操作Redis
-
Flink怎么操作redis?
- 方式一:自定义sink
- 方式二:使用connector
-
Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
- getCommandDescription 选择对应的数据结构和key名称配置
- getKeyFromData 获取key
- getValueFromData 获取value
-
使用
- 添加依赖
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
-
编码
public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER"); } @Override public String getKeyFromData(Tuple2<String, Integer> value) { return value.f0; } @Override public String getValueFromData(Tuple2<String, Integer> value) { return value.f1.toString(); } }
Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战
-
Redis环境说明 redis6
-
使用docker部署redis6.x 看个人主页docker相关文章
docker run -d -p 6379:6379 redis
-
-
编码实战
数据源
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x课程");
list.add("微服务SpringCloud课程");
list.add("RabbitMQ消息队列");
list.add("Kafka课程");
list.add("小滴课堂面试专题第一季");
list.add("Flink流式技术课程");
list.add("工业级微服务项目大课训练营");
list.add("Linux课程");
}
/**
* run 方法调用前 用于初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 产生数据的逻辑
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
ctx.collect(videoOrder);
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
flag = false;
}
}
保存的格式与存取的方法
public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {
/***
* 选择需要用到的命令,和key名称
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
}
/**
* 获取对应的key或者filed
*
* @param data
* @return
*/
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
System.out.println("getKeyFromData=" + data.f0);
return data.f0;
}
/**
* 获取对应的值
*
* @param data
* @return
*/
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
System.out.println("getValueFromData=" + data.f1.toString());
return data.f1.toString();
}
}
落地
public class Flink07RedisSinkApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
// DataStream<VideoOrder> ds = env.fromElements(
// new VideoOrder("21312","java",32,5,new Date()),
// new VideoOrder("314","java",32,5,new Date()),
// new VideoOrder("542","springboot",32,5,new Date()),
// new VideoOrder("42","redis",32,5,new Date()),
// new VideoOrder("4252","java",32,5,new Date()),
// new VideoOrder("42","springboot",32,5,new Date()),
// new VideoOrder("554232","flink",32,5,new Date()),
// new VideoOrder("23323","java",32,5,new Date())
// );
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//transformation
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
// DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
// @Override
// public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
// out.collect(new Tuple2<>(value.getTitle(),1));
// }
// });
//分组
KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//统计每组有多少个
DataStream<Tuple2<String,Integer>> sumDS = keyByDS.sum(1);
//控制台打印
sumDS.print();
//单机redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));
//DataStream需要调用execute,可以取个名称
env.execute("custom redis sink job");
}
}
原文地址:https://blog.csdn.net/weixin_45574790/article/details/132857544
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_51245.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。