带有 Kafka 字段的 logstash 中的条件问题 ---->FileBeat 探矿者 [英] Issue with conditionals in logstash with fields from Kafka ----> FileBeat prospectors
问题描述
我有以下场景:
FileBeat ----> Kafka -----> Logstash -----> Elastic ----> Kibana
FileBeat ----> Kafka -----> Logstash -----> Elastic ----> Kibana
在 Filebeat 中,我有 2 个探矿者在 YML 文件中,.我添加了一些字段来识别日志数据.但是,问题是:在 Logstash 中,我无法验证这些字段.
In Filebeat I have 2 prospectors the in YML file,,. and I add some fields to identify the log data. But, the issue is: in Logstash I haven't be able to validate this fields.
配置文件为:
1.filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- /opt/jboss/server.log*
tags: ["log_server"]
fields:
environment: integracion
log_type: log_server
document_type: log_server
fields_under_root: true
- input_type: log
paths:
- /var/todo1_apps/ebanTX.log*
tags: ["log_eban"]
fields:
environment: integracion
log_type: log_ebanking
document_type: log_ebanking
fields_under_root: true
output.kafka:
enabled: true
hosts: ["192.168.105.68:9092"]
topic: "sve_logs"
timeout: 30s
2.logstash.conf
input {
kafka {
bootstrap_servers => "192.xxx.xxx.xxx:9092"
group_id => "sve_banistmo"
topics => ["sve_logs"]
decorate_events => true
codec => "plain"
}
}
filter {
if [type] == "log_ebanking" {
grok {
patterns_dir => ["patterns/patterns"]
match => { "message" => "%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{LOGLEVEL:level}%{SPACE}\[%{DATA:thread}]%{SPACE}-%{SPACE}%{GREEDYDATA:message_log}" }
}
}
}
output {
if [type] == "log_ebanking" {
elasticsearch {
hosts => ["192.168.105.67:9200"]
index => "sve-banistmo-ebanking-%{+YYYY.MM.dd}"
}
stdout { codec => json}
}
}
问题出在条件过滤器和输出部分.我试过
The problem is in the conditional filter and output section. I've tried with
@[metadata][type]
@metadata][type]
@metadata.type
metadata.type
[type]
同时使用 type 和 log_type 变量.什么都行不通!!:S如果我不放条件,数据流没有问题.我的意思是,这不是连接问题.
With both the type and log_type variable. Nothing works !! :S If I don't put conditionals, the data flow without problem. I mean, is not a conection issue.
请帮帮我.我已经查看了所有相关信息,但就我而言,条件不起作用.
Please help me. I've reviewed all the information related, but in my case the conditional doesn't work.
提前致谢
达里奥 R
推荐答案
使用 codec =>"json"
从 logstash.conf
kafka input conf 中的消息中提取所有字段.
Use codec => "json"
to extract all fields from the message in logstash.conf
kafka input conf.
这篇关于带有 Kafka 字段的 logstash 中的条件问题 ---->FileBeat 探矿者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!