问题描述
graph LR;
filebeat --> logstash;
log4j --> logstash;
logstash --> es;
filebeat 和 log4j appender 同时到 kafka, logstash在启动时报错, 错误如下:
1
| javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0
|
问题原因及解决
input 消费kafka时, 分别指定不同的 client_id.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| kafka { bootstrap_servers => ["192.168.103.43:9092"] # 注意这里配置的kafka的broker地址不是zk的地址 client_id => "kafka_client_1" group_id => "logstash" topics => ["ipaynow_log"] # kafka topic 名称 consumer_threads => 5 decorate_events => true type => "string" codec => "json" }
kafka { bootstrap_servers => ["192.168.103.43:9092"] # 注意这里配置的kafka的broker地址不是zk的地址 client_id => "kafka_client_2" group_id => "logstash" topics => ["ipaynow-hunter"] # kafka topic 名称 consumer_threads => 5 decorate_events => true type => "string" codec => plain { charset=>"UTF-8" } }
|