本文介绍: broker.id=0 ●21行,broker全局唯一编号每个broker不能重复,因此要在其他机器配置 broker.id=1、broker.id=2。log.segment.bytes=1073741824 #110行,一个segment文件最大大小默认为 1G,超出新建一个新的segment文件log.retention.hours=168 #103行,segment文件数据文件)保留的最长时间单位为小时,默认为7天,超时将被删除

kafka 3.4.1

elk+filebeat+kafka

实现日志收集

httpd1 mysql1 topic 2.7 3.0

关闭防火墙

systemctl stop firewalld

systemctl disable firewalld

setenforce 0

安装 JDK

yum instally java-1.8.0-openjdk java-1.8.0-openjdkdevel

javaversion

安装 Zookeeper

cd /opt

tar -zxvf apachezookeeper-3.5.7-bin.tar.gz

mv apachezookeeper-3.5.7-bin /opt/zookeeper

修改配置文件

cd /opt/zookeeper/conf/

cp zoo_sample.cfg zoo.cfg

tickTime=2000 #通信心跳时间,Zookeeper服务器客户端心跳时间单位毫秒

initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s

syncLimit=5 #Leader和Follower之间同步通信超时时间这里表示如果超过52s,Leader认为Follwer死掉,并从服务器列表中删除Follwer

dataDir=/opt/zookeeper/data修改指定保存Zookeeper中的数据目录,目录需要单独创建

dataLogDir=/opt/zookeeper/logs ●添加,指定存放日志的目录,目录需要单独创建

clientPort=2181 #客户端连接端口

#添加集群信息

server.1=192.168.176.70:3188:3288

server.2=192.168.176.71:3188:3288

server.3=192.168.176.72:3188:3288

每个节点创建数据目录和日志目录

mkdir /opt/zookeeper/data

mkdir /opt/zookeeper/logs

每个节点dataDir指定的目录下创建一个 myid文件

echo 1 > /opt/zookeeper/data/myid

echo 2 > /opt/zookeeper/data/myid

echo 3 > /opt/zookeeper/data/myid

配置 Zookeeper 启动脚本

vim /etc/init.d/zookeeper

#!/bin/bash

#chkconfig:2345 20 90

#description:Zookeeper Service Control Script

ZK_HOME=’/opt/zookeeper

case $1 in

start)

echo “———- zookeeper 启动 ————“

$ZK_HOME/bin/zkServer.sh start

;;

stop)

echo “———- zookeeper 停止 ————“

$ZK_HOME/bin/zkServer.sh stop

;;

restart)

echo “———- zookeeper 重启 ————“

$ZK_HOME/bin/zkServer.sh restart

;;

status)

echo “———- zookeeper 状态 ————“

$ZK_HOME/bin/zkServer.sh status

;;

*)

echo “Usage: $0 {start|stop|restart|status}”

esac

设置开机自启

chmod +x /etc/init.d/zookeeper

chkconfigadd zookeeper

分别启动 Zookeeper

service zookeeper start

查看当前状态

service zookeeper status

部署 kafka 集群

安装

cd /opt/

tar zxvf kafka_2.13-2.7.1.tgz

mv kafka_2.13-2.7.1 kafka

修改配置文件

cd kafka/config/

cp server.properties server.properties.bak

vim server.properties

改经纪人id

如果修改id 28行可以不改

broker.id=0 ●21行,broker全局唯一编号每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2

listeners=PLAINTEXT://192.168.80.10:9092 ●31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改

num.network.threads=3 #42行,broker 处理网络请求线程数量,一般情况下不需要去修改

num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘

socket.send.buffer.bytes=102400 #48行,发送接字缓冲区大小

socket.receive.buffer.bytes=102400 #51行,接收接字缓冲区大小

socket.request.max.bytes=104857600 #54行,请求套接字缓冲区大小

log.dirs=/var/log/kafka #60行,kafka运行日志存放路径,也是数据存放路径

num.partitions=1 #65行,topic当前broker上的默认分区个数,会被topic创建时的指定参数覆盖

num.recovery.threads.per.data.dir=1 #69行,用来恢复清理data下数据线程数量

log.retention.hours=168 #103行,segment文件数据文件)保留的最长时间单位为小时,默认为7天,超时将被删除

log.segment.bytes=1073741824 #110行,一个segment文件最大大小默认为 1G,超出新建一个新的segment文件

#Kafka 以日志文件的形式维护其数据,而这些日志文件被分割多个日志段。当一个日志段达到指定的大小时,就会创建一个新的日志段。

配置连接Zookeeper集群地址

zookeeper.connect=192.168.176.70:2181,192.168.176.71:2181,192.168.176.72:2181

kafka默认不允许删除主题

一台配置一下

修改环境变量日志段是主题分区日志文件的一部分

vim /etc/profile

export KAFKA_HOME=/opt/kafka

export PATH=$PATH:$KAFKA_HOME/bin

配置 Zookeeper 启动脚本

vim /etc/init.d/kafka

设置开机自启

chmod +x /etc/init.d/kafka

chkconfig —add kafka

分别启动 Kafka

service kafka start

Kafka 命令行操作

创建topic

kafka-topics.shcreatebootstrapserver 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 –replication-factor 2 —partitions 3 —topic cc

查看当前服务器中的所有 topic

kafka-topics.shlistbootstrapserver 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092

查看某个 topic 的详情

kafka-topics.sh —describe —bootstrapserver 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092

发布消息

kafka-consoleproducer.sh –broker-list 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 –topic cc

消费消息

kafka-consoleconsumer.sh —bootstrapserver 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 –topic cc

kafka-consoleconsumer.sh —bootstrapserver 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 –topic cc –from-beginning

创建单个主题

kafka-topics.sh —createbootstrapserver 192.168.176.70:9092 –replication-factor 2 —partitions 3 –topic test1

修改分区数

kafka-topics.sh —bootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 —alter –topic test1 —partitions 6

删除 topic

kafka-topics.sh —deletebootstrap-server 192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092 –topic test1

安装 Filebeat

tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz

mv filebeat-6.7.2-linux-x86_64/ filebeat

安装logstash

#上传软件包 logstash-6.7.2.rpm 到/opt目录下

cd /opt

rpm -ivh logstash-6.7.2.rpm

systemctl start logstash.service

systemctl enable logstash.service

ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

安装nginx或者httpd

yum -y install epelrelease

yum -y install nginx

yum -y install httpd

部署 Zookeeper+Kafka 集群

2.部署 Filebeat

cd /usr/local/filebeat

vim filebeat.yml

type: log

enabled: true

paths:

– /var/log/nginx/access.log

– /var/log/nginx/error.log

tags: [“nginx“]

fields:

service_name: 192.168.176.10_nginx

log_type: nginx

from: 192.168.176.10

output.kafka:

enabled: true

hosts: [“192.168.233.70:9092″,”192.168.233.71:9092″,”192.168.233.72:9092”]

topic: “nginx”

底下output 注释

nohup ./filebeat -e -c filebeat.yml >filebeat.out &

tail -f filebeat.out

部署 ELK,在 Logstash 组件所在节点新建一个 Logstash 配置文件

cd /etc/logstash/conf.d/

vim kafka.conf

input {

kafka {

bootstrap_servers => “192.168.176.70:9092,192.168.176.71:9092,192.168.176.72:9092”

topics => “nginx”

type => “nginx_kafka”

codec => “json

auto_offset_reset => “earliest”

decorate_events => true

}

}

output {

if “nginx” in [tags] {

elasticsearch {

hosts => [“192.168.176.10:9200″,”192.168.176.50:9200”]

index => “%{[fields][service_name]}-%{+YYYY.MM.dd}”

}

}

stdout { codec => rubydebug }

}

logstash -f kafka.confpath.data /opt/test1 &

httpd服务

yum -y install httpd

安装 Filebeat

tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz

mv filebeat-6.7.2-linux-x86_64/ filebeat

安装logstash

#上传软件包 logstash-6.7.2.rpm 到/opt目录下

cd /opt

rpm -ivh logstash-6.7.2.rpm

systemctl start logstash.service

systemctl enable logstash.service

ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

部署 Zookeeper+Kafka 集群

2.部署 Filebeat

cd /usr/local/filebeat

vim filebeat.yml

底下output 注释

nohup ./filebeat -e -c filebeat.yml >filebeat.out &

tail -f filebeat.out

在 Logstash 组件所在节点上新建一个 Logstash 配置文件

cd /etc/logstash/conf.d/

vim kafkahttpd.conf

logstash -f kafkahttpd.confpath.data /opt/test1 &

原文地址:https://blog.csdn.net/weixin_51694382/article/details/134739741

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_23962.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注