本文介绍: 可以根据要写入redis的不同数据类型进行调整。具体版本根据实际情况确定。

一、添加Redis Connector依赖

具体版本根据实际情况确定

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId&gt;flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>

二、启动redis

参见大数据-玩转数据-Redis 安装与使用

三、编写代码

package com.lyh.flink06;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
        KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer key) throws Exception {
                return key.intValue();
            }
        });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop100")
                .setPort(6379)
                .setMaxTotal(100)
                .setMaxIdle(10)
                .setMinIdle(2)
                .setTimeout(10*1000)
                .setDatabase(0)
                .setPassword("redis")
                .build();


        keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.SET);
            }

            @Override
            public String getKeyFromData(Integer integer) {
                return integer.toString();
            }

            @Override
            public String getValueFromData(Integer integer) {
                return integer.toString();
            }
        }));
        env.execute();
    }
}

可以根据要写入redis的不同数据类型进行调整

四、查询结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

原文地址:https://blog.csdn.net/s_unbo/article/details/132252712

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_25870.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注