本文介绍: 我这里是为了防止sink的文件过于零碎, 但因为使用的memory channel, 缓存时间过长容易丢数据。默认启动时-Xmx20m, 过于小了, 加大堆内存可以直接放开。下载JindoSDK(连接OSS依赖), 下载地址。2. 进阶配置, 根据自己情况按需配置。3. Flume JVM参数。修改Hadoop配置,
安装环境
配置Hadoop
下载JindoSDK(连接OSS依赖), 下载地址Github
解压后配置环境变量
export JINDOSDK_HOME=/usr/lib/jindosdk-x.x.x
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JINDOSDK_HOME}/lib/*
<property>
<name>fs.oss.credentials.provider</name>
<value>com.aliyun.jindodata.oss.auth.SimpleCredentialsProvider</value>
</property>
<property>
<name>fs.oss.accessKeyId</name>
<value>xxxx</value>
</property>
<property>
<name>fs.oss.accessKeySecret</name>
<value>xxxx</value>
</property>
<property>
<name>fs.oss.endpoint</name>
<value>xxxxx</value>
</property>
<property>
<name>fs.AbstractFileSystem.oss.impl</name>
<value>com.aliyun.jindodata.oss.JindoOSS</value>
</property>
<property>
<name>fs.oss.impl</name>
<value>com.aliyun.jindodata.oss.JindoOssFileSystem</value>
</property>
配置Flume
此部分全文最关键, 请仔细看
- 基础配置部分, Flume配置
a1.sources = source1
a1.sinks = k1
a1.channels = c1
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.channels = c1
a1.sources.source1.kafka.bootstrap.servers = xxx
a1.sources.source1.kafka.topics = test
a1.sources.source1.kafka.consumer.group.id = flume-sink-group # 消费者组, 云组件需要先在管理后台创建
a1.sources.source1.kafka.consumer.auto.offset.reset = earliest # 从头消费Kafka里数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = oss://xxx/test/%Y%m%d # 自动按天分文件夹
a1.sinks.k1.hdfs.fileType=DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
可参考使用Flume同步EMR Kafka集群的数据至OSS-HDFS服务
2. 进阶配置, 根据自己情况按需配置
a1.sinks.k1.hdfs.rollInterval = 600 # 5分钟切换一个新文件
a1.sinks.k1.hdfs.rollSize = 134217728 # 或者文件大小达到128M则切换新文件
a1.sinks.k1.hdfs.rollCount = 0 # 写入多少条数据切换新文件, 0为不限制
我这里是为了防止sink的文件过于零碎, 但因为使用的memory channel, 缓存时间过长容易丢数据
3. Flume JVM参数
默认启动时-Xmx20m, 过于小了, 加大堆内存可以直接放开flume-env.sh
内JAVA_OPTS
的注释
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
XX启动!
bin/zookeeper-server-start.sh config/zookeeper.properties # 启动zookeeper
bin/kafka-server-start.sh config/server.properties # 启动kafak服务
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 # 启动flume
bin/kafka-console-producer.sh --topic flume-test --bootstrap-server localhost:9092 # 启动一个生产者写测试数据
原文地址:https://blog.csdn.net/u012355401/article/details/134606088
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_15155.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。