logstash配置

发布 : 2018-06-20 分类 : elk 浏览 :

输入

input {
beats {
port => "5043"
}
}

配置日志输入方式为 filebeat, 并配置端口

过滤

filter {

grok {
match => {
"message" => "\[%{DATA:time}\]-\[%{DATA:method}\] - \[%{DATA:catalina}\] -\[%{DATA:logLevel}\] - \[%{DATA:index_prefix}\|%{WORD:logType}\|%{WORD:sysNo}\|%{WORD:objType}\|%{DATA:funcode}\|%{WORD:monitorObjNo}\|%{WORD:bizId}\|%{WORD:respCode}\|%{DATA:respMsg}\|%{WORD:costTime}|%{DATA:exField}\]"
}
}

grok{
match => { "time" => ["%{INT:y_index}-%{INT:M_index}-%{INT:d_index}"]}
}

mutate {
add_field => { "[@metadata][index_suffix]" => "%{y_index}%{M_index}%{d_index}" }
remove_field => ["beat","host","thread","class","source","tags","type","y_index","M_index","d_index"]
lowercase => [ "index_prefix" ]
lowercase => [ "funcode" ]
lowercase => [ "objType" ]
lowercase => [ "monitorObjNo" ]
}


}
  1. 使用gork过滤器对日志进行筛选, 并对部分字段赋值.
  2. 使用mutate插件对字段进行转换, add_field 为添加字段 [@metadata][index_suffix] 意思是添加临时字段, 该字段不会输出到es中

输出

output {

if [logType] == "info" {
elasticsearch {
hosts => [ "xxx.xxx.xxx.xxx:9200" ]
index => "%{index_prefix}_%{objType}_%{funcode}_%{[@metadata][index_suffix]}"
user => elastic
password => xxx
}
}



if [logType] == "error" {
redis {
data_type => "list"
db => 0
#key => "%{index_prefix}_%{sysNo}_%{objType}_%{funcode}_%{[@metadata][index_suffix]}"
key => "%{index_prefix}_%{sysNo}_%{objType}_%{monitorObjNo}"
host => "xxx.xxx.xxx.xxx"
port => "6379"
password => "xxx"
}
}

}

将过滤后的字段按照类型输出到Es或者redis队列中

启动命令

./bin/logstash -f first-pipelines.yml
nohup ./logstash -f ../first-pipelines.yml >/dev/null 2>&1 &

其他配置

# 输出到控制台
stdout { codec => rubydebug }

本文作者 : liuzhihang
原文链接 : https://liuzhihang.com/2018/06/20/logstash-configuration.html
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

博客已萌萌哒运行(●'◡'●)ノ♥
Theme - BMW | Made With 💗 | Powered by GodBMW