2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)

ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)

时间:2023-11-29 00:40:41

相关推荐

ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)

部署环境

备注:资源有限,仅做kafka集群,其他服务做单点测试

日志获取

部署kafka集群

1、上传下载的jar至/data/soft目录

2、解压相关文件至指定目录,修改文件夹名称

[root@yhcs_1 soft]# tar -xvf jdk-19_linux-x64_bin.tar.gz -C /opt

[root@yhcs_1 soft]# tar -xvf apache-zookeeper-3.8.0-bin.tar.gz -C /data

[root@yhcs_1 soft]# tar -xvf kafka_2.13-3.4.0.tgz -C /data

3、配置JDK(zookeeper依赖java1.8+)

[root@yhcs_1 data]# cat /etc/profile

...

#JDK

export JAVA_HOME=/opt/jdk-19.0.2

export JAVA_BIN=/opt/jdk-19.0.2/bin

export PATH=$PATH:$JAVA_HOME/bin

export JAVA_HOME JAVA_BIN PATH

[root@yhcs_1 data]# source /etc/profile

4、配置zookeeper

[root@yhcs_1 conf]# cat zoo.cfg

tickTime=2000

#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒

initLimit=5

#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为5*2s

syncLimit=2

#Leader和Follower之间同步通信的超时时间,这里表示如果超过2*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer

dataDir=/data/zookeeper-3.8.0/data

#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建

dataLogDir=/data/zookeeper-3.8.0/logs

#添加,指定存放日志的目录,目录需要单独创建

clientPort=2181

#客户端连接端口

#添加集群信息

server.1=192.168.1.200:2888:3888

server.2=192.168.1.210:2888:3888

server.3=192.168.1.220:2888:3888

#创建配置文件涉及的目录

[root@yhcs_1 conf]# mkdir -p /data/zookeeper-3.8.0/{data,logs}

#设置myid数值与上面配置文件server.1、server.2、server.3对应

[root@yhcs_1 conf]# echo 1 >/data/zookeeper-3.8.0/data/myid

[root@yhcs_2 conf]# echo 2 >/data/zookeeper-3.8.0/data/myid

[root@yhcs_3 conf]# echo 3 >/data/zookeeper-3.8.0/data/myid

#编写zookeeper启动脚本

[root@yhcs_1 data]# cat /etc/init.d/zookeeper

#!/bin/bash

#chkconfig:2345 20 90

#description:Zookeeper Service Control Script

ZK_HOME='/data/zookeeper-3.8.0'

JAVA_HOME=/opt/jdk-19.0.2

JAVA_BIN=/opt/jdk-19.0.2/bin

PATH=$PATH:$JAVA_HOME/bin

case $1 in

start)

echo "---------- zookeeper 启动 ------------"

$ZK_HOME/bin/zkServer.sh start

;;

stop)

echo "---------- zookeeper 停止 ------------"

$ZK_HOME/bin/zkServer.sh stop

;;

restart)

echo "---------- zookeeper 重启 ------------"

$ZK_HOME/bin/zkServer.sh restart

;;

status)

echo "---------- zookeeper 状态 ------------"

$ZK_HOME/bin/zkServer.sh status

;;

*)

echo "Usage: $0 {start|stop|restart|status}"

esac

[root@yhcs_1 data]# chmod +x /etc/init.d/zookeeper

[root@yhcs_1 data]# chkconfig --add zookeeper

#启动zookeeper

[root@yhcs_1 data]# systemctl start zookeeper

[root@yhcs_1 data]# systemctl status zookeeper

5、配置kafka

[root@yhcs_1 data]# cat /data/kafka_2.13-3.4.0/config/server.properties

#broker的全局唯一编号,每个broker不能重复

broker.id=1

delete.topic.enable=true

port=9092

host.name=192.168.1.200

#指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改

listeners=PLAINTEXT://:9092

advertised.host.name=192.168.1.200

#broker 处理网络请求的线程数量,一般情况下不需要去修改

work.threads=3

#用来处理磁盘IO的线程数量,数值应该大于硬盘数

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#日志目录需先创建,不会自动创建。 日志存放目录、数据存放目录

log.dirs=/data/kafka_2.13-3.4.0/logs

#topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖

num.partitions=3

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=2

transaction.state.log.min.isr=1

#segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除

log.retention.hours=168

#一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=192.168.1.200:2181,192.168.1.210:2181,192.168.1.220:2181

zookeeper.connection.timeout.ms=6000

session.timeout.ms=10s

max.poll.records=100

max.poll.interval.ms=600000

group.initial.rebalance.delay.ms=0

#创建配置文件涉及目录

[root@yhcs_1 data]# mkdir -p /data/kafka_2.13-3.4.0/logs

#编写kafka启动脚本

[root@yhcs_1 data]# cat /etc/init.d/kafka

#配置 kafka 启动脚本

#!/bin/bash

#chkconfig:2345 60 20

#description:Kafka Service Control Script

KAFKA_HOME='/data/kafka_2.13-3.4.0'

JAVA_HOME=/opt/jdk-19.0.2

JAVA_BIN=/opt/jdk-19.0.2/bin

PATH=$PATH:$JAVA_HOME/bin

case $1 in

start)

echo "---------- Kafka 启动 ------------"

${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

;;

stop)

echo "---------- Kafka 停止 ------------"

${KAFKA_HOME}/bin/kafka-server-stop.sh

;;

restart)

$0 stop

$0 start

;;

status)

echo "---------- Kafka 状态 ------------"

count=$(ps -ef | grep kafka | egrep -cv "grep|$$")

if [ "$count" -eq 0 ];then

echo "kafka is not running"

else

echo "kafka is running"

fi

;;

*)

echo "Usage: $0 {start|stop|restart|status}"

esac

[root@yhcs_1 data]# chmod +x /etc/init.d/kafka

[root@yhcs_1 data]# chkconfig --add kafka

#启动kafka

[root@yhcs_1 data]# systemctl start kafka

[root@yhcs_1 data]# systemctl status kafka

#kafka集群测试

#最新的kafka创建主题topic方式:

./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092 --replication-factor 2 --partitions 3 --topic test

./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --replication-factor 2 --partitions 3 --topic test1

#查看当前服务器中的所有 topic

./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092

./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092

#查看某个 topic 的详情

./kafka-topics.sh --describe --bootstrap-server 192.168.1.200:9092

#发布消息

./kafka-console-producer.sh --broker-list 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test

#消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test --from-beginning

--from-beginning:会把主题中以往所有的数据都读取出来

#修改分区数

./kafka-topics.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --alter --topic test --partitions 6

#删除 topic

./kafka-topics.sh --delete --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test

部署filebeat

#解压filebeat

[root@yhcs_1 soft]# tar -xvf filebeat-8.6.2-linux-x86_64.tar.gz -C /data

#配置filebeat

[root@yhcs_1 filebeat-8.6.2]# cat filebeat.yml

filebeat.inputs:

- type: log

enabled: true

paths:

- /var/log/messages

output.kafka:

enabled: true

hosts: ["192.168.1.200:9092","192.168.1.210:9092","192.168.1.220:9092"]

topic: "demo-topic"

#启动filebeat

[root@yhcs_1 filebeat-8.6.2]# nohup ./filebeat -e -c filebeat.yml -d "publish" 2>&1 &

部署elasticsearch

#解压es

[root@yhcs_3 soft]# tar -xvf elasticsearch-8.6.2-linux-x86_64.tar.gz -C /data

#配置es

[root@yhcs_3 config]# egrep -v "^$|#" elasticsearch.yml

node.name: node-1

path.data: /data/elasticsearch-8.6.2/data

path.logs: /data/elasticsearch-8.6.2/logs

network.host: 192.168.1.220

http.port: 9200

cluster.initial_master_nodes: ["node-1"]

ingest.geoip.downloader.enabled: false

xpack.security.enabled: false

discovery.seed_hosts: ["192.168.1.220"]

#创建涉及目录

[root@yhcs_3 config]# mkdir -p /data/elasticsearch-8.6.2/data

#创建es用户(es、kibana需要使用非root用户)

[root@yhcs_3 config]# useradd esroot

[root@yhcs_3 config]# passwd esroot_@321

[root@yhcs_3 config]# chown -R esroot:esroot elasticsearch-8.6.2/

#修改其他配置

[root@yhcs_3 config]# vi /etc/security/limits.conf

#在文件末尾中增加下面内容

esroot soft nofile 65536

esroot hard nofile 65536

[root@yhcs_3 config]# vi /etc/security/limits.d/20-nproc.conf

#在文件末尾中增加下面内容

esroot soft nofile 65536

esroot hard nofile 65536

* hard nproc 4096

#注:*代表Linux所有用户名称

[root@yhcs_3 config]# vi /etc/sysctl.conf

#在文件中增加下面内容

vm.max_map_count=655360

#重新加载,输入下面命令

sysctl -p

#启动es

[root@yhcs_3 soft]# su esroot

[esroot@yhcs_3 soft]$ cd /data/elasticsearch-8.6.2/bin/

[esroot@yhcs_3 bin]$ nohup ./elasticsearch -d 2>&1 &

#访问es

http://192.168.1.220:9200/

部署kibana

#解压kibana

[root@yhcs_3 soft]# tar -xvf kibana-8.6.2-linux-x86_64.tar.gz -C /data

#配置kibana

[root@yhcs_3 config]# egrep -v "^$|#" kibana.yml

server.port: 5601

server.host: "192.168.1.220"

elasticsearch.hosts: ["http://192.168.1.220:9200"]

logging.appenders.default:

type: file

fileName: /data/kibana-8.6.2/logs/kibana.log

layout:

type: json

pid.file: /data/kibana-8.6.2/run/kibana.pid

telemetry.enabled: false

i18n.locale: "zh-CN"

#创建涉及目录

[root@yhcs_3 config]# mkdir -p /data/kibana-8.6.2/{logs,run}

#启动kibana

[root@yhcs_3 soft]# su esroot

[esroot@yhcs_3 soft]$ cd /data/kibana-8.6.2/bin/

[esroot@yhcs_3 bin]$ nohup ./kibana2>&1 &

#访问kibana(这个时候还没有数据)

http://192.168.1.220:5601/

部署logstash

#解压kibana

[root@yhcs_2 soft]# tar -xvf logstash-8.6.2-linux-x86_64.tar.gz -C /data

#配置logstash

[root@yhcs_2 config]# cat /data/logstash-8.6.2/config/logstash-sample.conf

# Sample Logstash configuration for creating a simple

# Beats -> Logstash -> Elasticsearch pipeline.

input {

kafka {

bootstrap_servers => "192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092"

topics => "demo-topic"

group_id => "test"

}

}

output {

elasticsearch {

hosts => ["http://192.168.1.220:9200"]

index => "test_index"

#user => "elastic"

#password => "changeme"

}

}

#启动logstash

[root@yhcs_2 config]# cd /data/logstash-8.6.2

[root@yhcs_2 bin]# nohup ./bin/logstash -f ./config/logstash-sample.conf --config.reload.automatic --path.data=/data/logstash-8.6.2/ >/dev/null 2>&1 &

#访问kibana(整个链路走通就可以在kibana看到采集的日志信息)

http://192.168.1.220:5601/

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。