输入

1
2
3
4
5
input {
beats {
port => "5043"
}
}

配置日志输入方式为 filebeat, 并配置端口

过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
filter {

grok {
match => {
"message" => "\[%{DATA:time}\]-\[%{DATA:method}\] - \[%{DATA:catalina}\] -\[%{DATA:logLevel}\] - \[%{DATA:index_prefix}\|%{WORD:logType}\|%{WORD:sysNo}\|%{WORD:objType}\|%{DATA:funcode}\|%{WORD:monitorObjNo}\|%{WORD:bizId}\|%{WORD:respCode}\|%{DATA:respMsg}\|%{WORD:costTime}|%{DATA:exField}\]"
}
}

grok{
match => { "time" => ["%{INT:y_index}-%{INT:M_index}-%{INT:d_index}"]}
}

mutate {
add_field => { "[@metadata][index_suffix]" => "%{y_index}%{M_index}%{d_index}" }
remove_field => ["beat","host","thread","class","source","tags","type","y_index","M_index","d_index"]
lowercase => [ "index_prefix" ]
lowercase => [ "funcode" ]
lowercase => [ "objType" ]
lowercase => [ "monitorObjNo" ]
}


}
  1. 使用gork过滤器对日志进行筛选, 并对部分字段赋值.
  2. 使用mutate插件对字段进行转换, add_field 为添加字段 [@metadata][index_suffix] 意思是添加临时字段, 该字段不会输出到es中

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
output {

if [logType] == "info" {
elasticsearch {
hosts => [ "xxx.xxx.xxx.xxx:9200" ]
index => "%{index_prefix}_%{objType}_%{funcode}_%{[@metadata][index_suffix]}"
user => elastic
password => xxx
}
}



if [logType] == "error" {
redis {
data_type => "list"
db => 0
#key => "%{index_prefix}_%{sysNo}_%{objType}_%{funcode}_%{[@metadata][index_suffix]}"
key => "%{index_prefix}_%{sysNo}_%{objType}_%{monitorObjNo}"
host => "xxx.xxx.xxx.xxx"
port => "6379"
password => "xxx"
}
}

}

将过滤后的字段按照类型输出到Es或者redis队列中

启动命令

    ./bin/logstash -f first-pipelines.yml
    nohup ./logstash -f ../first-pipelines.yml >/dev/null 2>&1 &

其他配置

1
2
# 输出到控制台
stdout { codec => rubydebug }