上一篇:深夜看了张一鸣的微博,让我越想越后怕
整体流程大概如下:
服务器准备
在这先列出各服务器节点,方便同学们在下文中对照节点查看相应内容
SpringBoot项目准备
引入log4j2替换SpringBoot默认log,demo项目结构如下:
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!--排除spring-boot-starter-logging--><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--log4j2--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency></dependencies>
log4j2.xml
<?xmlversion="1.0"encoding="UTF-8"?><Configurationstatus="INFO"schema="Log4J-V2.0.xsd"monitorInterval="600"><Properties><Propertyname="LOG_HOME">logs</Property><propertyname="FILE_NAME">collector</property><propertyname="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}][%level{length=5}][%thread-%tid][%logger][%X{hostName}][%X{ip}][%X{applicationName}][%F,%L,%C,%M][%m]##'%ex'%n</property></Properties><Appenders><Consolename="CONSOLE"target="SYSTEM_OUT"><PatternLayoutpattern="${patternLayout}"/></Console><RollingRandomAccessFilename="appAppender"fileName="${LOG_HOME}/app-${FILE_NAME}.log"filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"><PatternLayoutpattern="${patternLayout}"/><Policies><TimeBasedTriggeringPolicyinterval="1"/><SizeBasedTriggeringPolicysize="500MB"/></Policies><DefaultRolloverStrategymax="20"/></RollingRandomAccessFile><RollingRandomAccessFilename="errorAppender"fileName="${LOG_HOME}/error-${FILE_NAME}.log"filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"><PatternLayoutpattern="${patternLayout}"/><Filters><ThresholdFilterlevel="warn"onMatch="ACCEPT"onMismatch="DENY"/></Filters><Policies><TimeBasedTriggeringPolicyinterval="1"/><SizeBasedTriggeringPolicysize="500MB"/></Policies><DefaultRolloverStrategymax="20"/></RollingRandomAccessFile></Appenders><Loggers><!--业务相关异步logger--><AsyncLoggername="com.bfxy.*"level="info"includeLocation="true"><AppenderRefref="appAppender"/></AsyncLogger><AsyncLoggername="com.bfxy.*"level="info"includeLocation="true"><AppenderRefref="errorAppender"/></AsyncLogger><Rootlevel="info"><Appender-Refref="CONSOLE"/><Appender-Refref="appAppender"/><AppenderRefref="errorAppender"/></Root></Loggers></Configuration>
IndexController
测试Controller,用以打印日志进行调试
@Slf4j@RestControllerpublicclassIndexController{@RequestMapping(value="/index")publicStringindex(){InputMDC.putMDC();log.info("我是一条info日志");log.warn("我是一条warn日志");log.error("我是一条error日志");return"idx";}@RequestMapping(value="/err")publicStringerr(){InputMDC.putMDC();try{inta=1/0;}catch(Exceptione){log.error("算术异常",e);}return"err";}}
InputMDC
用以获取log中的[%X{hostName}]
、[%X{ip}]
、[%X{applicationName}]
三个字段值
@ComponentpublicclassInputMDCimplementsEnvironmentAware{privatestaticEnvironmentenvironment;@OverridepublicvoidsetEnvironment(Environmentenvironment){InputMDC.environment=environment;}publicstaticvoidputMDC(){MDC.put("hostName",NetUtil.getLocalHostName());MDC.put("ip",NetUtil.getLocalIp());MDC.put("applicationName",environment.getProperty("spring.application.name"));}}
NetUtil
publicclassNetUtil{publicstaticStringnormalizeAddress(Stringaddress){String[]blocks=address.split("[:]");if(blocks.length>2){thrownewIllegalArgumentException(address+"isinvalid");}Stringhost=blocks[0];intport=80;if(blocks.length>1){port=Integer.valueOf(blocks[1]);}else{address+=":"+port;//usedefault80}StringserverAddr=String.format("%s:%d",host,port);returnserverAddr;}publicstaticStringgetLocalAddress(Stringaddress){String[]blocks=address.split("[:]");if(blocks.length!=2){thrownewIllegalArgumentException(address+"isinvalidaddress");}Stringhost=blocks[0];intport=Integer.valueOf(blocks[1]);if("0.0.0.0".equals(host)){returnString.format("%s:%d",NetUtil.getLocalIp(),port);}returnaddress;}privatestaticintmatchedIndex(Stringip,String[]prefix){for(inti=0;i<prefix.length;i++){Stringp=prefix[i];if("*".equals(p)){//*,assumedtobeIPif(ip.startsWith("127.")||ip.startsWith("10.")||ip.startsWith("172.")||ip.startsWith("192.")){continue;}returni;}else{if(ip.startsWith(p)){returni;}}}return-1;}publicstaticStringgetLocalIp(StringipPreference){if(ipPreference==null){ipPreference="*>10>172>192>127";}String[]prefix=ipPreference.split("[>]+");try{Patternpattern=pile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");Enumeration<NetworkInterface>interfaces=NetworkInterface.getNetworkInterfaces();StringmatchedIp=null;intmatchedIdx=-1;while(interfaces.hasMoreElements()){NetworkInterfaceni=interfaces.nextElement();Enumeration<InetAddress>en=ni.getInetAddresses();while(en.hasMoreElements()){InetAddressaddr=en.nextElement();Stringip=addr.getHostAddress();Matchermatcher=pattern.matcher(ip);if(matcher.matches()){intidx=matchedIndex(ip,prefix);if(idx==-1)continue;if(matchedIdx==-1){matchedIdx=idx;matchedIp=ip;}else{if(matchedIdx>idx){matchedIdx=idx;matchedIp=ip;}}}}}if(matchedIp!=null)returnmatchedIp;return"127.0.0.1";}catch(Exceptione){return"127.0.0.1";}}publicstaticStringgetLocalIp(){returngetLocalIp("*>10>172>192>127");}publicstaticStringremoteAddress(SocketChannelchannel){SocketAddressaddr=channel.socket().getRemoteSocketAddress();Stringres=String.format("%s",addr);returnres;}publicstaticStringlocalAddress(SocketChannelchannel){SocketAddressaddr=channel.socket().getLocalSocketAddress();Stringres=String.format("%s",addr);returnaddr==null?res:res.substring(1);}publicstaticStringgetPid(){RuntimeMXBeanruntime=ManagementFactory.getRuntimeMXBean();Stringname=runtime.getName();intindex=name.indexOf("@");if(index!=-1){returnname.substring(0,index);}returnnull;}publicstaticStringgetLocalHostName(){try{return(InetAddress.getLocalHost()).getHostName();}catch(UnknownHostExceptionuhe){Stringhost=uhe.getMessage();if(host!=null){intcolon=host.indexOf(':');if(colon>0){returnhost.substring(0,colon);}}return"UnknownHost";}}}
启动项目,访问/index
和/ero
接口,可以看到项目中生成了app-collector.log
和error-collector.log
两个日志文件:
我们将Springboot服务部署在192.168.11.31这台机器上。
Kafka安装和启用
kafka下载地址:
/downloads.html
kafka安装步骤:首先kafka安装需要依赖与zookeeper,所以小伙伴们先准备好zookeeper环境(三个节点即可),然后我们来一起构建kafka broker。
另外,Kafka 系列面试题和答案全部整理好了,微信搜索互联网架构师,在后台发送:面试,可以在线阅读。
##解压命令:tar-zxvfkafka_2.12-2.1.0.tgz-C/usr/local/##改名命令:mvkafka_2.12-2.1.0/kafka_2.12##进入解压后的目录,修改server.properties文件:vim/usr/local/kafka_2.12/config/server.properties##修改配置:broker.id=0port=9092host.name=192.168.11.51advertised.host.name=192.168.11.51log.dirs=/usr/local/kafka_2.12/kafka-logsnum.partitions=2zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181##建立日志文件夹:mkdir/usr/local/kafka_2.12/kafka-logs##启动kafka:/usr/local/kafka_2.12/bin/kafka-server-start.sh/usr/local/kafka_2.12/config/server.properties&
创建两个topic
##创建topickafka-topics.sh--zookeeper192.168.11.111:2181--create--topicapp-log-collector--partitions1--replication-factor1kafka-topics.sh--zookeeper192.168.11.111:2181--create--topicerror-log-collector--partitions1--replication-factor1
我们可以查看一下topic情况
kafka-topics.sh--zookeeper192.168.11.111:2181--topicapp-log-test--describe
可以看到已经成功启用了app-log-collector
和error-log-collector
两个topic
filebeat安装和启用
filebeat下载
cd/usr/local/softwaretar-zxvffilebeat-6.6.0-linux-x86_64.tar.gz-C/usr/local/cd/usr/localmvfilebeat-6.6.0-linux-x86_64/filebeat-6.6.0
配置filebeat,可以参考下方yml配置文件
vim/usr/local/filebeat-5.6.2/filebeat.yml######################FilebeatConfigurationExample#########################filebeat.prospectors:-input_type:logpaths:##app-服务名称.log,为什么写死,防止发生轮转抓取历史数据-/usr/local/logs/app-collector.log#定义写入ES时的_type值document_type:"app-log"multiline:#pattern:'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'#指定匹配的表达式(匹配以-11-1508:04:23:889时间格式开头的字符串)pattern:'^\['#指定匹配的表达式(匹配以"{开头的字符串)negate:true#是否匹配到match:after#合并到上一行的末尾max_lines:2000#最大的行数timeout:2s#如果在规定时间没有新的日志事件就不等待后面的日志fields:logbiz:collectorlogtopic:app-log-collector##按服务划分用作kafkatopicevn:dev-input_type:logpaths:-/usr/local/logs/error-collector.logdocument_type:"error-log"multiline:#pattern:'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'#指定匹配的表达式(匹配以-11-1508:04:23:889时间格式开头的字符串)pattern:'^\['#指定匹配的表达式(匹配以"{开头的字符串)negate:true#是否匹配到match:after#合并到上一行的末尾max_lines:2000#最大的行数timeout:2s#如果在规定时间没有新的日志事件就不等待后面的日志fields:logbiz:collectorlogtopic:error-log-collector##按服务划分用作kafkatopicevn:devoutput.kafka:enabled:truehosts:["192.168.11.51:9092"]topic:'%{[fields.logtopic]}'partition.hash:reachable_only:truecompression:gzipmax_message_bytes:1000000required_acks:1logging.to_files:true
filebeat启动:
检查配置是否正确
cd/usr/local/filebeat-6.6.0./filebeat-cfilebeat.yml-configtest##ConfigOK
启动filebeat
/usr/local/filebeat-6.6.0/filebeat&
检查是否启动成功
ps-ef|grepfilebeat
可以看到filebeat已经启动成功
然后我们访问192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已经生成了app-log-collector-0和error-log-collector-0文件,说明filebeat已经帮我们把数据收集好放到了kafka上。
logstash安装
我们在logstash的安装目录下新建一个文件夹
mkdirscrpit
然后cd进该文件,创建一个logstash-script.conf
文件
cdscrpitvimlogstash-script.conf## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。input{kafka{##app-log-服务名称topics_pattern=>"app-log-.*"bootstrap_servers=>"192.168.11.51:9092"codec=>jsonconsumer_threads=>1##增加consumer的并行消费线程数decorate_events=>true#auto_offset_rest=>"latest"group_id=>"app-log-group"}kafka{##error-log-服务名称topics_pattern=>"error-log-.*"bootstrap_servers=>"192.168.11.51:9092"codec=>jsonconsumer_threads=>1decorate_events=>true#auto_offset_rest=>"latest"group_id=>"error-log-group"}}filter{##时区转换ruby{code=>"event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"}if"app-log"in[fields][logtopic]{grok{##表达式,这里对应的是Springboot输出的日志格式match=>["message","\[%{NOTSPACE:currentDateTime}\]\[%{NOTSPACE:level}\]\[%{NOTSPACE:thread-id}\]\[%{NOTSPACE:class}\]\[%{DATA:hostName}\]\[%{DATA:ip}\]\[%{DATA:applicationName}\]\[%{DATA:location}\]\[%{DATA:messageInfo}\]##(\'\'|%{QUOTEDSTRING:throwable})"]}}if"error-log"in[fields][logtopic]{grok{##表达式match=>["message","\[%{NOTSPACE:currentDateTime}\]\[%{NOTSPACE:level}\]\[%{NOTSPACE:thread-id}\]\[%{NOTSPACE:class}\]\[%{DATA:hostName}\]\[%{DATA:ip}\]\[%{DATA:applicationName}\]\[%{DATA:location}\]\[%{DATA:messageInfo}\]##(\'\'|%{QUOTEDSTRING:throwable})"]}}}##测试输出到控制台:output{stdout{codec=>rubydebug}}## elasticsearch:output{if"app-log"in[fields][logtopic]{##es插件elasticsearch{#es服务地址hosts=>["192.168.11.35:9200"]#用户名密码user=>"elastic"password=>"123456"##索引名,+号开头的,就会自动认为后面是时间格式:##javalog-app-service-.01.23index=>"app-log-%{[fields][logbiz]}-%{index_time}"#是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty#通过嗅探机制进行es集群负载均衡发日志消息sniffing=>true#logstash默认自带一个mapping模板,进行模板覆盖template_overwrite=>true}}if"error-log"in[fields][logtopic]{elasticsearch{hosts=>["192.168.11.35:9200"]user=>"elastic"password=>"123456"index=>"error-log-%{[fields][logbiz]}-%{index_time}"sniffing=>truetemplate_overwrite=>true}}}
启动logstash
/usr/local/logstash-6.6.0/bin/logstash-f/usr/local/logstash-6.6.0/script/logstash-script.conf&
等待启动成功,我们再次访问192.168.11.31:8001/err
可以看到控制台开始打印日志
ElasticSearch与Kibana
ES和Kibana的搭建之前没写过博客,网上资料也比较多,大家可以自行搜索。
搭建完成后,访问Kibana的管理页面192.168.11.35:5601
,选择Management -> Kinaba - Index Patterns
然后Create index pattern
index pattern 输入app-log-*
Time Filter field name 选择 currentDateTime
这样我们就成功创建了索引。
我们再次访问192.168.11.31:8001/err
,这个时候就可以看到我们已经命中了一条log信息
里面展示了日志的全量信息
到这里,我们完整的日志收集及可视化就搭建完成了!
原文链接:/lt326030434/article/details/107361190
感谢您的阅读,也欢迎您发表关于这篇文章的任何建议,关注我,技术不迷茫!小编到你上高速。
· END ·
最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。
正文结束
推荐阅读 ↓↓↓
1.不认命,从流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事
2.如何才能成为优秀的架构师?
3.从零开始搭建创业公司后台技术栈
4.程序员一般可以从什么平台接私活?
5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...
6.IntelliJ IDEA .3 首个最新访问版本发布,新特性抢先看
7.这封“领导痛批95后下属”的邮件,句句扎心!
8.15张图看懂瞎忙和高效的区别!
一个人学习、工作很迷茫?
点击「阅读原文」加入我们的小圈子!