Elk+Filebeat+Kafka实现日志收集(本机nginx)

部署Zookeeper

1.实验组件

#准备3台服务器做Zookeeper集群
20.0.0.10
20.0.0.20
20.0.0.30

2.安装准备

#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0

#安装JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

#将apache-zookeeper-3.5.7-bin.tar.gz压缩包上传至/opt目录

3.安装Zookeeper

#三台服务器一齐操作
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper

#修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
--2--
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间单位毫秒

--5--
initLimit=10
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s

--8--
syncLimit=5
#Leader和Follower之间同步通信超时时间这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer

--12--修改
dataDir=/opt/zookeeper/data
#指定保存Zookeeper中的数据目录目录需要单独创建

--添加--
dataLogDir=/opt/zookeeper/logs
#指定存放日志的目录目录需要单独创建

--15--
clientPort=2181
#客户端连接端口

--末尾添加集群信息--
server.1=20.0.0.10:3188:3288
server.2=20.0.0.20:3188:3288
server.3=20.0.0.30:3188:3288

#在每个节点创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs

#在每个节点dataDir指定的目录创建一个 myid文件,不同节点分配1、2、3
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid

#配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper

#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
	echo "---------- zookeeper 启动 ------------"
	$ZK_HOME/bin/zkServer.sh start
;;
stop)
	echo "---------- zookeeper 停止 ------------"
	$ZK_HOME/bin/zkServer.sh stop
;;
restart)
	echo "---------- zookeeper 重启 ------------"
	$ZK_HOME/bin/zkServer.sh restart
;;
status)
	echo "---------- zookeeper 状态 ------------"
	$ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

#分别启动 Zookeeper
service zookeeper start

#查看当前状态
service zookeeper status

部署Kafka(3.4.1版本)

1.安装Kafka

cd /opt
--上传kafka_2.13-3.4.1.tgz--
tar -xf kafka_2.13-3.4.1.tgz
mv kafka_2.13-3.4.1 kafka
cd kafka/config/
cp server.properties server.properties.bak
vim server.properties
--24--
broker.id=1
#broker全局唯一编号每个broker不能重复,因此要在其他机器配置 broker.id=2、broker.id=3

--34--
listeners=PLAINTEXT://20.0.0.10:9092
#每台服务器分别为10、20、30,不用加地址映射

--62--
log.dirs=/var/log/kafka
#kafka运行日志存放路径,也是数据存放路径

--125--
zookeeper.connect=20.0.0.10:2181,20.0.0.20:2181,20.0.0.30:2181
#配置连接Zookeeper集群地址

#修改全局配置
vim /etc/profile
--添加--
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
#配置Zookeeper启动脚本
vim /etc/init.d/kafka

#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)
	echo "---------- Kafka 启动 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
	$0 stop
	$0 start
;;
status)
	echo "---------- Kafka 状态 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka

#分别启动Kafka
service kafka start

2.命令行测试

#创建topic
kafka-topics.sh --create --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --replication-factor 2 --partitions 3 --topic test1

#查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092

#发布消息
kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092  --topic test1

#消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1 --from-beginning

#修改分区数
kafka-topics.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --alter --topic test1 --partitions 6

#删除 topic
kafka-topics.sh --delete --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1

部署Filebeat 

 1.安装Filebeat

#10
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
vim /etc/logstash/logstash.yml
--64--
path.config: /opt/log

systemctl restart logstash

2.时间同步

#所有节点
yum -y install ntpdate
ntpdate ntp.aliyun.com 
date

3.配置filebeat

#给nginx日志文件赋权
cd /var/log/nginx/
chmod 777 access.log error.log

#配置filebeat
cd /opt/filebeat
vim filebeat.yml
filebeat.inputs:

- type: log
  enabled: true	
  paths:
    - /var/log/nginx/access.log
    - /var/log/nginx/error.log
  tags: ["nginx"]
  fields:
    service_name: 20.0.0.10_nginx
    log_type: nginx
    from: 20.0.0.10

output.kafka:
  enabled: true
  hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
  topic: "nginx"


--------------Elasticsearch output-------------------
(全部注释掉)

----------------Logstash output---------------------
(全部注释掉)

nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat

4.配置logstash

cd /opt/log/
vim kafka.conf

input {
    kafka {
        bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
        topics  => "nginx"
        type => "nginx_kafka"
        codec => "json"
                auto_offset_reset => "earliest"
                decorate_events => true
    }
}

output {
  if "nginx" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  stdout { codec => rubydebug }
}

logstash -f kafka.conf --path.data /opt/test1

日志收集(远程Apache+Mysql)

部署Filebeat

1.安装配filebeat

#收集81服务器上的mysqlapache日志
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
cd filebeat/
vim filebeat.yml
 
filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /etc/httpd/logs/access_log
    - /etc/httpd/logs/error_log
  tags: ["httpd_81"]
  fields:
    service_name: 20.0.0.81_httpd
    log_type: httpd
    from: 20.0.0.81
 
- type: log
  enabled: true
  paths:
    - /usr/local/mysql/data/mysql_general.log
  tags: ["mysql_81"]
  fields:
    service_name: 20.0.0.81_mysql
    log_type: mysql
    from: 20.0.0.81

output.kafka:
  enabled: true
  hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
  topic: "httpdmysql"
 
--------------Elasticsearch output-------------------
(全部注释掉)

----------------Logstash output---------------------
(全部注释掉)

nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat

2.配置logstash

10:
cd /opt/log/
vim 81_a+m.conf 
 
input {
    kafka {
        bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
        topics  => "httpdmysql"
        type => "httpd+mysql_kafka"
        codec => "json"
                auto_offset_reset => "earliest"
                decorate_events => true
    }
}

output {
  if "httpd_81" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }


  if "mysql_81" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  stdout { codec => rubydebug }
}

logstash -f 81_a+m.conf --path.data /opt/test2

原文地址:https://blog.csdn.net/pupcarrot/article/details/134708899

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_36012.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注