2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > filebeat+redis+logstash+elasticsearch filebeat+kafka+zookeeper+logstash+elasticsearch

filebeat+redis+logstash+elasticsearch filebeat+kafka+zookeeper+logstash+elasticsearch

时间:2021-09-27 19:39:43

相关推荐

filebeat+redis+logstash+elasticsearch filebeat+kafka+zookeeper+logstash+elasticsearch

收集日志的工具

日志易(收费)splunk(国外,按流量收费)

介绍

发展史:使用java语言,在luncen的基础上做二次封装,提供restful接口

搜索的原理:倒排索引

特点:水平扩展方便、提供高可用、分布式存储、使用简单

配置文件

/etc/elasticsearch/elasticsearch.yml#es的主要配置文件/etc/elasticsearch/jvm.options#配置jvm虚拟机的内存信息/etc/sysconfig/elasticsearch #配置相关系统变量/usr/lib/sysctl.d/elasticsearch.conf#配置相关系统变量/usr/lib/systemd/system/elasticsearch.service#es的服务程序

[root@es01 elasticsearch]# grep -Ev ‘^$|#’ /etc/elasticsearch/elasticsearch.yml

node.name: oldboy01

path.data: /var/lib/elasticsearch

path.logs: /var/log/elasticsearch

bootstrap.memory_lock: true #内存锁定

network.host: 10.0.0.240

http.port: 9200

systemctl edit elasticsearch

[Service]

LimitMEMLOCK=infinity

概念

1、索引:相当于在mysql当中创建一个数据库(database)

2、类型:相当于数据库当中的一张表(table)

3、docs:表中字段信息

命令行交互

创建一个索引

curl -XPUT http://10.0.0.240:9200/oldboy

写入一条数据

curl -XPUT ‘10.0.0.240:9200/oldboy/student/1?pretty’ -H ‘Content-Type: application/json’ -d’

{

“first_name” : “zhang”,

“last_name”: “san”,

“age” : 28,

“about” : “I love to go rock climbing”,

“interests”: [ “sports” ]

}’

随机id写入数据

curl -XPOST ‘10.0.0.240:9200/oldboy/student/?pretty’ -H ‘Content-Type: application/json’ -d’ {

“first_name”: “li”,

“last_name” : “mingming”,

“age” : 45,

“about”: “I like to swime”,

“interests”: [ “reading” ]

}’

查询指定id的信息

curl -XGET ‘10.0.0.240:9200/oldboy/student/1?pretty’

查询索引内所有信息

curl -XGET ‘10.0.0.240:9200/oldboy/_search/?pretty’

将多个索引写入新索引中

curl -u -H “Content-Type: application/json” -XPOST ‘http:///_reindex?pretty’ -d ’

{

“source”: {“index”:“xgks__forumcmt,xgks__forumcmt,xgks_2026_forumcmt,xgks__forumcmt”},

“dest”: {“index”:“test__22_26_21_forumcmt”}

}’

删除指定id的信息

curl -XDELETE ‘10.0.0.240:9200/oldboy/student/1?pretty’

删除索引

curl -XDELETE ‘10.0.0.240:9200/oldboy/?pretty’

kibana交互

配置文件

[root@es01 es-software]# grep -Ev ‘^$|#’ /etc/kibana/kibana.yml

server.port: 5601

server.host: “10.0.0.240”

elasticsearch.hosts: [“http://10.0.0.240:9200”]

kibana.index: “.kibana”

修改系统默认分片和副本信息

PUT _template/template_http_request_record

{

“index_patterns”: ["*"],

“settings”: {

“number_of_shards” : 5,

“number_of_replicas” : 1

}

}

es集群

cluster.name: oldboy-cluster

node.name: oldboy01

path.data: /var/lib/elasticsearch

path.logs: /var/log/elasticsearch

bootstrap.memory_lock: true

network.host: 10.0.0.240

http.port: 9200

discovery.zen.ping.unicast.hosts: [“10.0.0.240”, “10.0.0.241”] #相互通信即可

discovery.zen.minimum_master_nodes: 2 #两个节点选择主节点

/var/log/elasticsearch/my-application.log #日志文件

分片作用

主分片:粗框代表主分片,主要响应修改索引相关请求信息,提供对外查询的服务

副本分片:细框代表副本分片,也就是主分片的备份,提供对外查询服务

冗余

集群当中有3台主机,最多可以宕机几台

1、前提是一个副本,一台一台的宕机最多可以宕机2台(需要手动的修改一下配置文件)

2、前提是一个副本,一台一台的宕机,每宕机一台修复一台,集群的健康值始终为绿色

3、前提是二个副本,如一次性宕机2台,然后后动修改es的配置文件,可以让集群正常运行

注意事项

索引一旦创建完成,分片的数量是不能修改,副本的数量是可以修改

分片数量是集群数量倍数,3*3。根据自己的需求来定分片的数量

监控es集群运行的状态

curl ‘10.0.0.240:9200/_cluster/health?pretty’

{

“cluster_name” : “oldboy-cluster”,

“status” : “green”, #代表集群当前的健康值

“timed_out” : false,

“number_of_nodes” : 3,#当前集群中节点的个数

“number_of_data_nodes” : 3,

“active_primary_shards” : 14,

“active_shards” : 33,

“relocating_shards” : 0,

“initializing_shards” : 0,

“unassigned_shards” : 0,

“delayed_unassigned_shards” : 0,

“number_of_pending_tasks” : 0,

“number_of_in_flight_fetch” : 0,

“task_max_waiting_in_queue_millis” : 0,

“active_shards_percent_as_number” : 100.0

}

备份索引

vim /etc/profile

export PATH=/opt/node/bin:$PATH更换源npm install -g cnpm --registry=https://registry.cnpm install elasticdump -g备份索引mkdir /data elasticdump \ --input=http://10.0.0.240:9200/oldboy03 \ --output=/data/oldboy03.json \ --type=data恢复数据elasticdump \ --input=/data/oldboy.json \ --output=http://10.0.0.240:9200/oldboy压缩式备份elasticdump \ --input=http://10.0.0.240:9200/oldboy03 \ --output=$|gzip > /data/oldboy03.json.gz

中文分词器(所有节点安装)

安装

cd /usr/share/elasticsearch

./bin/elasticsearch-plugin install file:///opt/es-software/elasticsearch-analysis-ik-6.6.0.zip

systemctl restart elasticsearch

test索引应用

curl -XPOST http://10.0.0.240:9200/news/text/_mapping -H ‘Content-Type:application/json’ -d’

{

“properties”: {

“content”: {

“type”: “text”,

“analyzer”: “ik_max_word”,

“search_analyzer”: “ik_smart”

}

}

}’

写入数据

POST /news/text/3

{“content”:“贵宾成犬粮”}

验证

{

“query” : { “match” : { “content” : “贵宾成犬” }},

“highlight” : {

“pre_tags” : ["<tag1>", “<tag2>”],

“post_tags” : ["</tag1>", “</tag2>”],

“fields” : {

“content” : {}

}

}

}

动态添加词典

server {

listen 80;

server_name ;

location / {

root /usr/share/nginx/html/download;

charset utf-8,gbk;

autoindex on;

autoindex_localtime on;

autoindex_exact_size off;

}

}

cd /etc/elasticsearch/analysis-ik/

vim IKAnalyzer.cfg.xml

<entry key=“remote_ext_dict”>http://10.0.0.240/download/dic.txt</entry>

systemctl restart elasticsearch

filebeat

vim /etc/filebeat/filebeat.ymlfilebeat.inputs:- type: logenabled: truepaths:- /var/log/nginx/access.logjson.keys_under_root: true #将日志格式更改为jsonoverwrite_keys: true- type: logenabled: truepaths:- /var/log/nginx/error.log- type: logenabled: truepaths:- /opt/tomcat/logs/localhost_access_log.*.txtjson.keys_under_root: trueoverwrite_keys: truetags: ["tomcat"]- type: logenabled: truepaths:- /var/log/elasticsearch/elasticsearch.logmultiline.pattern: '^\[' #收集java日志,开头以[为分割multiline.negate: truemultiline.match: afteroutput.elasticsearch:hosts: ["10.0.0.240:9200","10.0.0.241:9200"]#冗余方案indices: #不同日志生成不同的索引- index: "nginx-access-%{[beat.version]}-%{+yyyy.MM}"when.contains:source: "/var/log/nginx/access.log" #根据文件名称区分- index: "nginx-error-%{[beat.version]}-%{+yyyy.MM}"when.contains:source: "/var/log/nginx/error.log"- index: "tomcat-access-%{[beat.version]}-%{+yyyy.MM}"when.contains:tags: "tomcat" #根据标签区分- index: "mariadb-slow-%{[beat.version]}-%{+yyyy.MM}"when.contains:source: "/var/log/mariadb/slow.log"- index: "mariadb-error-%{[beat.version]}-%{+yyyy.MM}"when.contains:source: "/var/log/mariadb/mariadb.log"setup.template.name: "nginx" #索引名称setup.template.pattern: "nginx-*" #索引样式filebeat.config.modules: #开启模块功能path: ${path.config}/modules.d/*.ymlreload.enabled: truereload.period: 10s#filebeat modules enable mysql启动mysql模块#filebeat modules list查看模块列表

log_format json '{"time_local": "$time_local", ''"remote_addr": "$remote_addr", ''"referer": "$http_referer", ''"request": "$request", ''"status": $status, ''"bytes": $body_bytes_sent, ''"agent": "$http_user_agent", ''"x_forwarded": "$http_x_forwarded_for", ''"up_addr": "$upstream_addr",''"up_host": "$upstream_http_host",''"upstream_time":"$upstream_response_time",''"request_time": "$request_time"'' }';access_log /var/log/nginx/access.log json;

pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>

cat /etc/filebeat/modules.d/mysql.yml - module: mysqlerror:enabled: truevar.paths: ["/var/log/mariadb/mariadb.log"]slowlog:enabled: truevar.paths: ["/var/log/mariadb/slow.log"]

nginx模块(自带json功能)无法使用

./bin/elasticsearch-plugin install ingest-user-agent

./bin/elasticsearch-plugin install ingest-geoip

filebeat+redis+logstash

filebeat

output.redis:hosts: ["10.0.0.240"]keys:- key: "nginx_access" #索引名称when.contains:tags: "access"- key: "nginx_error"when.contains:tags: "error"

redis检查日志

info

keys*

llen nginx_access #查看长度

lrange nginx_access 0 1 #查看第一个和第二个数据

logstash

cd /etc/logstash/conf.dvim redis_nginx.confinput {redis {host => "10.0.0.240"port => "6379"db => "0" #数据库名称key => "nginx_access"data_type => "list"}redis {host => "10.0.0.240"port => "6379"db => "0"key => "nginx_error"data_type => "list"}}filter {mutate {convert => ["upstream_time", "float"]convert => ["request_time", "float"]}}output {stdout {}if "access" in [tags] {elasticsearch {hosts => "http://10.0.0.240:9200"manage_template => falseindex => "nginx_access-%{+yyyy.MM}"}}if "error" in [tags] {elasticsearch {hosts => "http://10.0.0.240:9200"manage_template => falseindex => "nginx_error-%{+yyyy.MM}"}}}

filebeat+kafka+zookeeper+logstash+elasticsearch

安装zookeeper1、上传安装包cd /opt/es-software2、解压安装包tar xf zookeeper-3.4.11.tar.gz -C /optln -s /opt/zookeeper-3.4.11/ /opt/zookeeper3、创建数据目录mkdir -p /data/zookeeper4、复制一个配置文件cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg5、修改zoo.cfg配置文件dataDir=/data/zookeeper#在最后面加入server.1=10.0.0.240:2888:3888server.2=10.0.0.241:2888:3888server.3=10.0.0.242:2888:38886、给每台服务添加一个myid,每台服务器的id是不同echo "1" > /data/zookeeper/myidecho "2" > /data/zookeeper/myidecho "3" > /data/zookeeper/myid7、启动zookeeper服务/opt/zookeeper/bin/zkServer.sh start8、检查zookeeper的运行状态/opt/zookeeper/bin/zkServer.sh status安装kafka1、上传安装包cd /opt/es-software2、解压安装包tar zxf kafka_2.11-1.0.0.tgz -C /opt/ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka3、创建一个日志目录mkdir /opt/kafka/logs4、修改配置文件vim /opt/kafka/config/server.properties# broker的id,值为整数,且必须唯一,在一个集群中不能重复broker.id=1listeners=PLAINTEXT://10.0.0.240:9092# 处理网络请求的线程数量,默认为3个work.threads=3# 执行磁盘IO操作的线程数量,默认为8个num.io.threads=8# socket服务发送数据的缓冲区大小,默认100KBsocket.send.buffer.bytes=102400# socket服务接受数据的缓冲区大小,默认100KBsocket.receive.buffer.bytes=102400# socket服务所能接受的一个请求的最大大小,默认为100Msocket.request.max.bytes=104857600# kafka存储消息数据的目录log.dirs=/opt/kafka/logs# 每个topic默认的partition数量num.partitions=1# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量num.recovery.threads.per.data.dir=1#topic的offset的备份份数。建议设置更高的数字保证更高的可用性offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1#每个日志文件删除之前保存的时间log.retention.hours=24#这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件log.segment.bytes=1073741824#检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。log.retention.check.interval.ms=300000# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开zookeeper.connect=10.0.0.240:2181,10.0.0.241:2181,10.0.0.242:2181# 连接zookeeper的超时时间,6szookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=05、前台启动测试能否正常启动/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties6、测试创建topic/opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.241:2181,10.0.0.242:2181,10.0.0.240:2181 --partitions 3 --replication-factor 3 --topic kafkatest7、测试获取toppid/opt/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.241:2181,10.0.0.242:2181,10.0.0.240:2181 --topic kafkatest8、测试成功之后,可以放在后台启动/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties修改filebeat配置文件output.kafka:hosts: ["10.0.0.240:9092", "10.0.0.241:9092", "10.0.0.242:9092"]topic: 'filebeat'修改logstash配置文件vim /etc/logstash/conf.d/kafka.confinput {kafka{bootstrap_servers=>"10.0.0.240:9092"topics=>["filebeat"]group_id=>"logstash"codec => "json"}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。