2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

时间:2022-12-30 14:10:26

相关推荐

Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

文章目录

一、前言二、背景信息三、操作流程四、准备工作1、Docker 环境2、Docker Compose 环境3、版本准备4、环境初始化5、服务安装6、服务设置五、配置 Filebeat六、配置 Logstash 管道七、查看 kafka 日志消费状态八、查看 ES 内容九、通过 Kibana 过滤日志数据1、创建 index-pattern2、查看日志十、小结

一、前言

随着时间的积累,日志数据会越来越多,当你需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch采集日志数据到 Elasticsearch(简称ES)中,并通过 Kibana 进行可视化展示与分析。

本文介绍具体的实现方法。

二、背景信息

Kafka 是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。在实际应用场景中,为了满足大数据实时检索的需求,一般可以使用 Filebeat 采集日志数据,将 Kafka 作为 Filebeat 的输出端。Kafka 实时接收到 Filebeat 采集的数据后,以 Logstash 作为输出端输出。输出到 Logstash 中的数据在格式或内容上可能不能满足你的需求,此时可以通过 Logstash 的 filter 插件过滤数据。最后将满足需求的数据输出到 ES 中进行分布式检索,并通过 Kibana 进行数据分析与展示。

简单处理流程如下:

三、操作流程

准备工作: 完成环境准备包括创建对应服务安装 Filebeat 。配置 Filebeat:配置 Filebeat 的 input 为系统日志,outpu 为 Kafka,将日志数据采集到 Kafka 的指定 Topic 中。配置 Logstash 管道:配置 Logstash 管道的 input 为 Kafka,output 为ES,使用 Logstash 消费 Topic 中的数据并传输到ES 中。查看日志消费状态:在消息队列 Kafka 中查看日志数据的消费的状态,验证日志数据是否采集成功。通过 Kibana 过滤日志数据:在 Kibana 控制台的 Discover 页面,通过 Filter 过滤出 Kafka 相关的日志。

四、准备工作

CenterOS 7.6 版本,推荐 8G 以上内存。

1、Docker 环境

执行命令如下:

# 在 docker 节点执行# 腾讯云 docker hub 镜像# export REGISTRY_MIRROR="https://mirror."# DaoCloud 镜像# export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"# 阿里云 docker hub 镜像export REGISTRY_MIRROR=-# 安装 docker# 参考文档如下# /install/linux/docker-ce/centos/ # /install/linux/linux-postinstall/# 卸载旧版本yum remove -y docker \docker-client \docker-client-latest \docker-ce-cli \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine# 设置 yum repositoryyum install -y yum-utils \device-mapper-persistent-data \lvm2yum-config-manager --add-repo /docker-ce/linux/centos/docker-ce.repo# 安装并启动 dockeryum install -y docker-ce-19.03.11 docker-ce-cli-19.03.11 containerd.io-1.2.13mkdir /etc/docker || truecat > /etc/docker/daemon.json <<EOF{"registry-mirrors": ["${REGISTRY_MIRROR}"],"exec-opts": ["native.cgroupdriver=systemd"],"log-driver": "json-file","log-opts": {"max-size": "100m"},"storage-driver": "overlay2","storage-opts": ["overlay2.override_kernel_check=true"]}EOFmkdir -p /etc/systemd/system/docker.service.d# Restart Dockersystemctl daemon-reloadsystemctl enable dockersystemctl restart docker# 关闭 防火墙systemctl stop firewalldsystemctl disable firewalld# 关闭 SeLinuxsetenforce 0sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config# 关闭 swapswapoff -ayes | cp /etc/fstab /etc/fstab_bakcat /etc/fstab_bak |grep -v swap > /etc/fstab

验证下 docker info:

[root@vm-1]# docker infoClient:Debug Mode: falseServer:Containers: 16Running: 11Paused: 0Stopped: 5Images: 22Server Version: 19.03.11Storage Driver: overlay2Backing Filesystem: xfsSupports d_type: trueNative Overlay Diff: trueLogging Driver: json-fileCgroup Driver: systemdPlugins:Volume: localNetwork: bridge host ipvlan macvlan null overlayLog: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslogSwarm: inactiveRuntimes: runcDefault Runtime: runcInit Binary: docker-initcontainerd version: 7ad184331fa3e55e52b890ea95e65ba581ae3429runc version: dc9208a3303feef5b3839f4323d9beb36df0a9ddinit version: fec3683Security Options:seccompProfile: defaultKernel Version: 3.10.0-1127.el7.x86_64Operating System: CentOS Linux 7 (Core)OSType: linuxArchitecture: x86_64CPUs: 4Total Memory: 11.58GiBName: vm-autotest-serverID: KQ5B:KAG5:LLB5:CUD4:NQZX:4GHL:5XLY:FM7X:KRJ5:X3WK:42GV:QLONDocker Root Dir: /var/lib/dockerDebug Mode: falseRegistry: https://index.docker.io/v1/Labels:Experimental: falseInsecure Registries:172.16.62.179:5000127.0.0.0/8Registry Mirrors:-/Live Restore Enabled: false

2、Docker Compose 环境

Docker Compose是一个用于定义和运行多个 docker 容器应用的工具。使用 Compose 你可以用 YAML 文件来配置你的应用服务,然后使用一个命令,你就可以部署你配置的所有服务了。

# 下载 Docker Composesudo curl -L "/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose# 修改该文件的权限为可执行chmod +x /usr/local/bin/docker-compose# 验证信息docker-compose --version

3、版本准备

4、环境初始化

执行命令如下:

# 需要设置系统内核参数,否则 ES 会因为内存不足无法启动# 改变设置sysctl -w vm.max_map_count=262144# 使之立即生效sysctl -p# 创建 logstash 目录,并将 Logstash 的配置文件 logstash.conf 拷贝到该目录mkdir -p /mydata/logstash# 需要创建 elasticsearch/data 目录并设置权限,否则 ES 会因为无权限访问而启动失败mkdir -p /mydata/elasticsearch/data/chmod 777 /mydata/elasticsearch/data/

5、服务安装

docker-compose.yml文件内容为:

version: '3'services:elasticsearch:image: elasticsearch:7.6.2container_name: elasticsearchuser: rootenvironment:- "cluster.name=elasticsearch" #设置集群名称为elasticsearch- "discovery.type=single-node" #以单一节点模式启动volumes:- /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载- /mydata/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载- /etc/localtime:/etc/localtime:ro- /usr/share/zoneinfo:/usr/share/zoneinfoports:- 9200:9200- 9300:9300networks:- elasticlogstash:image: logstash:7.6.2container_name: logstashenvironment:- TZ=Asia/Shanghaivolumes:- /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf #挂载logstash的配置文件depends_on:- elasticsearch #kibana在elasticsearch启动之后再启动links:- elasticsearch:es #可以用es这个域名访问elasticsearch服务ports:- 5044:5044networks:- elastickibana:image: kibana:7.6.2container_name: kibanalinks:- elasticsearch:es #可以用es这个域名访问elasticsearch服务depends_on:- elasticsearch #kibana在elasticsearch启动之后再启动environment:- "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址- /etc/localtime:/etc/localtime:ro- /usr/share/zoneinfo:/usr/share/zoneinfoports:- 5601:5601networks:- elasticzookeeper:image: wurstmeister/zookeepercontainer_name: zookeepervolumes:- /mydata/zookeeper/data:/data- /mydata/zookeeper/log:/datalog- /etc/localtime:/etc/localtime:ro- /usr/share/zoneinfo:/usr/share/zoneinfonetworks:- elasticports:- "2181:2181"kafka:container_name: kafkaimage: wurstmeister/kafkadepends_on:- zookeepervolumes:- /var/run/docker.sock:/var/run/docker.sock- /mydata/kafka:/kafka- /etc/localtime:/etc/localtime:ro- /etc/localtime:/etc/localtime:rolinks:- zookeeperports:- "9092:9092"networks:- elasticenvironment:- KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT- KAFKA_INTER_BROKER_LISTENER_NAME=OUT- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_MESSAGE_MAX_BYTES=2000000- KAFKA_CREATE_TOPICS=logs:1:1networks:elastic:

将该文件上传的 linux 服务器上,执行docker-compose up命令即可启动所有服务。

[root@vm-1]# docker-compose -f docker-compose.yml up -d[root@vm-1]# docker-compose psName CommandStatePorts -----------------------------------------------------------------------------------------------------------elasticsearch /usr/local/bin/docker-entr ... Up0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp kafka start-kafka.sh Up0.0.0.0:9092->9092/tcp kibana/usr/local/bin/dumb-init - ... Up0.0.0.0:5601->5601/tcp logstash /usr/local/bin/docker-entr ... Up0.0.0.0:5044->5044/tcp, 9600/tcp zookeeper /bin/sh -c /usr/sbin/sshd ... Up0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp[root@vm-autotest-server elk]#

filebeat 客户端安装方式:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gztar xzvf filebeat-7.4.2-linux-x86_64.tar.gzcd filebeat-7.4.2-linux-x86_64

6、服务设置

当所有依赖服务启动完成后,需要对以下服务进行一些设置。

# elasticsearch 需要安装中文分词器 IKAnalyzer,并重新启动。docker exec -it elasticsearch /bin/bash#此命令需要在容器中运行elasticsearch-plugin install /medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zipdocker restart elasticsearch# logstas h需要安装 json_lines 插件,并重新启动。docker exec -it logstash /bin/bashlogstash-plugin install logstash-codec-json_linesdocker restart logstash

五、配置 Filebeat

修改filebeat.yml文件内容

ilebeat.inputs:- type: logenabled: truepaths:- /var/log/nginx/*.logfilebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: falsesetup.template.settings:index.number_of_shards: 1setup.dashboards.enabled: falsesetup.kibana:host: "http://kafka:5601"output.kafka:hosts: ["kafka:9092"]topic: 'logs'codec.json:pretty: false

参数说明:

注意:

客户端 hosts 添加 kafka 对应 server 的 ip 地址 以及 filebeat 配置建议使用 ansible。

[root@vm-1# cat /etc/hosts172.16.62.179 kafka# 客户端启动服务[root@vm-1#./filebeat &

更多配置请参见:

有关 filebeat 的 log input 的配置介绍见官网文档:https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html有关 filebeat output 到 kafka 的配置介绍见官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html

六、配置 Logstash 管道

修改 logstash.conf 内容:

input {# # 来源beats# beats {# 端口# port => "5044"# }kafka {bootstrap_servers => "kafka:29092"topics => ["logs"]group_id => "logstash"codec => json}}# 分析、过滤插件,可以多个# filter {# grok {# match => { "message" => "%{COMBINEDAPACHELOG}"}# }# geoip {# source => "clientip"# }# }output {# 选择elasticsearchelasticsearch {hosts => ["http://es:9200"]#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"index => "logs-%{+YYYY.MM.dd}"}}

input 参数说明:

output 参数说明:

注意:

logstash 中最为关键的地方在于 filter,为了调试 filter 的配置。

更多配置请参见:

有关 logstash 中 kafka-input 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html有关 logstash 中grok-filter 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html有关 logstash 中 output-elasticsearch 的配置介绍见官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

七、查看 kafka 日志消费状态

操作命令如下:

# 进入容器docker exec -it kafka bash# kafka 默认安装在 /opt/kafkacd opt/kafka# 要想查询消费数据,必须要指定组bash-5.1# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --listlogstash# 查看 topicbash-5.1# bin/kafka-topics.sh --list --zookeeper 172.16.62.179:2181__consumer_offsetslogs# 查看消费情况bash-5.1# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstashGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDlogstash logs 01073351073350logstash-0-c6d82a1c-0f14-4372-b49f-8cd476f54d90 /172.19.0.2logstash-0#参数解释:#--describe 显示详细信息#--bootstrap-server 指定kafka连接地址#--group 指定组。

字段解释:

从上面的信息可以看出,topic 为 logs 总共消费了 107335 条信息, 未消费的条数为 0。也就是说,消费数据没有积压的情况.

八、查看 ES 内容

通过elasticsearch-head插件查看 ES 中是否收到了由 logstash 发送过来的日志

九、通过 Kibana 过滤日志数据

1、创建 index-pattern

打开 es,进入首页后,点击“connect to your Elasticsearch index”

填入 es 中的索引名,支持正则匹配,输入 Index pattern(本文使用 logs-*),单击 Next step。

选择“@timestamp”作为时间过滤字段,然后点击“create index pattern”:

创建完成后:

2、查看日志

在左侧导航栏,单击 Discover。

从页面左侧的下拉列表中,选择已创建的索引模式(logs-*)。

在页面右上角,选择一段时间,查看对应时间段内的 Filebeat 采集的日志数据。

十、小结

在企业实际项目中,elk 是比较成熟且广泛使用的技术方案。logstash 性能稍弱于 filebeat,一般不直接运行于采集点,推荐使用filebeat。在日志进入elk前,从经验性角度,前置 kafka,一方面作为队列和缓冲,另一方面提供了统一的入口渠道。

源码地址:

/zuozewei/blog-example/tree/master/Performance-testing/04-full-link/Filebeat-Kafka-Logstash-Elasticsearch-Kibana

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。