输入
1 2 3 4 5
| input { beats { port => "5043" } }
|
配置日志输入方式为 filebeat, 并配置端口
过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| filter {
grok { match => { "message" => "\[%{DATA:time}\]-\[%{DATA:method}\] - \[%{DATA:catalina}\] -\[%{DATA:logLevel}\] - \[%{DATA:index_prefix}\|%{WORD:logType}\|%{WORD:sysNo}\|%{WORD:objType}\|%{DATA:funcode}\|%{WORD:monitorObjNo}\|%{WORD:bizId}\|%{WORD:respCode}\|%{DATA:respMsg}\|%{WORD:costTime}|%{DATA:exField}\]" } }
grok{ match => { "time" => ["%{INT:y_index}-%{INT:M_index}-%{INT:d_index}"]} }
mutate { add_field => { "[@metadata][index_suffix]" => "%{y_index}%{M_index}%{d_index}" } remove_field => ["beat","host","thread","class","source","tags","type","y_index","M_index","d_index"] lowercase => [ "index_prefix" ] lowercase => [ "funcode" ] lowercase => [ "objType" ] lowercase => [ "monitorObjNo" ] }
}
|
- 使用gork过滤器对日志进行筛选, 并对部分字段赋值.
- 使用mutate插件对字段进行转换, add_field 为添加字段 [@metadata][index_suffix] 意思是添加临时字段, 该字段不会输出到es中
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| output {
if [logType] == "info" { elasticsearch { hosts => [ "xxx.xxx.xxx.xxx:9200" ] index => "%{index_prefix}_%{objType}_%{funcode}_%{[@metadata][index_suffix]}" user => elastic password => xxx } }
if [logType] == "error" { redis { data_type => "list" db => 0 key => "%{index_prefix}_%{sysNo}_%{objType}_%{monitorObjNo}" host => "xxx.xxx.xxx.xxx" port => "6379" password => "xxx" } }
}
|
将过滤后的字段按照类型输出到Es或者redis队列中
启动命令
./bin/logstash -f first-pipelines.yml
nohup ./logstash -f ../first-pipelines.yml >/dev/null 2>&1 &
其他配置
1 2
| stdout { codec => rubydebug }
|