部署环境
备注:资源有限,仅做kafka集群,其他服务做单点测试
日志获取
部署kafka集群
1、上传下载的jar至/data/soft目录
2、解压相关文件至指定目录,修改文件夹名称
[root@yhcs_1 soft]# tar -xvf jdk-19_linux-x64_bin.tar.gz -C /opt
[root@yhcs_1 soft]# tar -xvf apache-zookeeper-3.8.0-bin.tar.gz -C /data
[root@yhcs_1 soft]# tar -xvf kafka_2.13-3.4.0.tgz -C /data
3、配置JDK(zookeeper依赖java1.8+)
[root@yhcs_1 data]# cat /etc/profile
...
#JDK
export JAVA_HOME=/opt/jdk-19.0.2
export JAVA_BIN=/opt/jdk-19.0.2/bin
export PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME JAVA_BIN PATH
[root@yhcs_1 data]# source /etc/profile
4、配置zookeeper
[root@yhcs_1 conf]# cat zoo.cfg
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=5
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为5*2s
syncLimit=2
#Leader和Follower之间同步通信的超时时间,这里表示如果超过2*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/data/zookeeper-3.8.0/data
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/data/zookeeper-3.8.0/logs
#添加,指定存放日志的目录,目录需要单独创建
clientPort=2181
#客户端连接端口
#添加集群信息
server.1=192.168.1.200:2888:3888
server.2=192.168.1.210:2888:3888
server.3=192.168.1.220:2888:3888
#创建配置文件涉及的目录
[root@yhcs_1 conf]# mkdir -p /data/zookeeper-3.8.0/{data,logs}
#设置myid数值与上面配置文件server.1、server.2、server.3对应
[root@yhcs_1 conf]# echo 1 >/data/zookeeper-3.8.0/data/myid
[root@yhcs_2 conf]# echo 2 >/data/zookeeper-3.8.0/data/myid
[root@yhcs_3 conf]# echo 3 >/data/zookeeper-3.8.0/data/myid
#编写zookeeper启动脚本
[root@yhcs_1 data]# cat /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/data/zookeeper-3.8.0'
JAVA_HOME=/opt/jdk-19.0.2
JAVA_BIN=/opt/jdk-19.0.2/bin
PATH=$PATH:$JAVA_HOME/bin
case $1 in
start)
echo "---------- zookeeper 启动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状态 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
[root@yhcs_1 data]# chmod +x /etc/init.d/zookeeper
[root@yhcs_1 data]# chkconfig --add zookeeper
#启动zookeeper
[root@yhcs_1 data]# systemctl start zookeeper
[root@yhcs_1 data]# systemctl status zookeeper
5、配置kafka
[root@yhcs_1 data]# cat /data/kafka_2.13-3.4.0/config/server.properties
#broker的全局唯一编号,每个broker不能重复
broker.id=1
delete.topic.enable=true
port=9092
host.name=192.168.1.200
#指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
listeners=PLAINTEXT://:9092
advertised.host.name=192.168.1.200
#broker 处理网络请求的线程数量,一般情况下不需要去修改
work.threads=3
#用来处理磁盘IO的线程数量,数值应该大于硬盘数
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#日志目录需先创建,不会自动创建。 日志存放目录、数据存放目录
log.dirs=/data/kafka_2.13-3.4.0/logs
#topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.partitions=3
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
#segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.retention.hours=168
#一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.200:2181,192.168.1.210:2181,192.168.1.220:2181
zookeeper.connection.timeout.ms=6000
session.timeout.ms=10s
max.poll.records=100
max.poll.interval.ms=600000
group.initial.rebalance.delay.ms=0
#创建配置文件涉及目录
[root@yhcs_1 data]# mkdir -p /data/kafka_2.13-3.4.0/logs
#编写kafka启动脚本
[root@yhcs_1 data]# cat /etc/init.d/kafka
#配置 kafka 启动脚本
#!/bin/bash
#chkconfig:2345 60 20
#description:Kafka Service Control Script
KAFKA_HOME='/data/kafka_2.13-3.4.0'
JAVA_HOME=/opt/jdk-19.0.2
JAVA_BIN=/opt/jdk-19.0.2/bin
PATH=$PATH:$JAVA_HOME/bin
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
[root@yhcs_1 data]# chmod +x /etc/init.d/kafka
[root@yhcs_1 data]# chkconfig --add kafka
#启动kafka
[root@yhcs_1 data]# systemctl start kafka
[root@yhcs_1 data]# systemctl status kafka
#kafka集群测试
#最新的kafka创建主题topic方式:
./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092 --replication-factor 2 --partitions 3 --topic test
./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --replication-factor 2 --partitions 3 --topic test1
#查看当前服务器中的所有 topic
./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092
./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092
#查看某个 topic 的详情
./kafka-topics.sh --describe --bootstrap-server 192.168.1.200:9092
#发布消息
./kafka-console-producer.sh --broker-list 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test
#消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
#修改分区数
./kafka-topics.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --alter --topic test --partitions 6
#删除 topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test
部署filebeat
#解压filebeat
[root@yhcs_1 soft]# tar -xvf filebeat-8.6.2-linux-x86_64.tar.gz -C /data
#配置filebeat
[root@yhcs_1 filebeat-8.6.2]# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
output.kafka:
enabled: true
hosts: ["192.168.1.200:9092","192.168.1.210:9092","192.168.1.220:9092"]
topic: "demo-topic"
#启动filebeat
[root@yhcs_1 filebeat-8.6.2]# nohup ./filebeat -e -c filebeat.yml -d "publish" 2>&1 &
部署elasticsearch
#解压es
[root@yhcs_3 soft]# tar -xvf elasticsearch-8.6.2-linux-x86_64.tar.gz -C /data
#配置es
[root@yhcs_3 config]# egrep -v "^$|#" elasticsearch.yml
node.name: node-1
path.data: /data/elasticsearch-8.6.2/data
path.logs: /data/elasticsearch-8.6.2/logs
network.host: 192.168.1.220
http.port: 9200
cluster.initial_master_nodes: ["node-1"]
ingest.geoip.downloader.enabled: false
xpack.security.enabled: false
discovery.seed_hosts: ["192.168.1.220"]
#创建涉及目录
[root@yhcs_3 config]# mkdir -p /data/elasticsearch-8.6.2/data
#创建es用户(es、kibana需要使用非root用户)
[root@yhcs_3 config]# useradd esroot
[root@yhcs_3 config]# passwd esroot_@321
[root@yhcs_3 config]# chown -R esroot:esroot elasticsearch-8.6.2/
#修改其他配置
[root@yhcs_3 config]# vi /etc/security/limits.conf
#在文件末尾中增加下面内容
esroot soft nofile 65536
esroot hard nofile 65536
[root@yhcs_3 config]# vi /etc/security/limits.d/20-nproc.conf
#在文件末尾中增加下面内容
esroot soft nofile 65536
esroot hard nofile 65536
* hard nproc 4096
#注:*代表Linux所有用户名称
[root@yhcs_3 config]# vi /etc/sysctl.conf
#在文件中增加下面内容
vm.max_map_count=655360
#重新加载,输入下面命令
sysctl -p
#启动es
[root@yhcs_3 soft]# su esroot
[esroot@yhcs_3 soft]$ cd /data/elasticsearch-8.6.2/bin/
[esroot@yhcs_3 bin]$ nohup ./elasticsearch -d 2>&1 &
#访问es
http://192.168.1.220:9200/
部署kibana
#解压kibana
[root@yhcs_3 soft]# tar -xvf kibana-8.6.2-linux-x86_64.tar.gz -C /data
#配置kibana
[root@yhcs_3 config]# egrep -v "^$|#" kibana.yml
server.port: 5601
server.host: "192.168.1.220"
elasticsearch.hosts: ["http://192.168.1.220:9200"]
logging.appenders.default:
type: file
fileName: /data/kibana-8.6.2/logs/kibana.log
layout:
type: json
pid.file: /data/kibana-8.6.2/run/kibana.pid
telemetry.enabled: false
i18n.locale: "zh-CN"
#创建涉及目录
[root@yhcs_3 config]# mkdir -p /data/kibana-8.6.2/{logs,run}
#启动kibana
[root@yhcs_3 soft]# su esroot
[esroot@yhcs_3 soft]$ cd /data/kibana-8.6.2/bin/
[esroot@yhcs_3 bin]$ nohup ./kibana2>&1 &
#访问kibana(这个时候还没有数据)
http://192.168.1.220:5601/
部署logstash
#解压kibana
[root@yhcs_2 soft]# tar -xvf logstash-8.6.2-linux-x86_64.tar.gz -C /data
#配置logstash
[root@yhcs_2 config]# cat /data/logstash-8.6.2/config/logstash-sample.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
bootstrap_servers => "192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092"
topics => "demo-topic"
group_id => "test"
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.220:9200"]
index => "test_index"
#user => "elastic"
#password => "changeme"
}
}
#启动logstash
[root@yhcs_2 config]# cd /data/logstash-8.6.2
[root@yhcs_2 bin]# nohup ./bin/logstash -f ./config/logstash-sample.conf --config.reload.automatic --path.data=/data/logstash-8.6.2/ >/dev/null 2>&1 &
#访问kibana(整个链路走通就可以在kibana看到采集的日志信息)
http://192.168.1.220:5601/