Logstash:将两个日志合并到一个输出文档中 [英] Logstash: Merge two logs into one output document

查看:1827
本文介绍了Logstash:将两个日志合并到一个输出文档中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经设置syslog将日志发送到logstash,并使用以下过滤器:

  output {
elasticsearch
{hosts => [localhost:9200]
document_id => %{job_id}
}

}
过滤器{
grok {
overwrite => [message]
}
json {
source => 消息
}
}

我的一个应用程序的一个典型消息将具有初始状态和job_id:

  {job_id:xyz782,状态:processing,job_type: something} 

稍后几分钟后,另一个日志将具有相同的log_id,一个不同的状态和处理时间:

  {job_id:xyz782,状态:failed,processing_time:12.345} 

这些字段正确加载,但创建了两个文档。我想要的只是为初始日志创建一个文档,而第二个日志是更新第一个日志,这意味着更新的文档将包含以下字段:



$ job $$$$$$ pre>

正如您在logstash conf输出中可以看到的那样,我使用job_id作为文档ID,但是,第二条消息似乎替换了第一条消息中的字段,而且还擦除第一个消息中不在第二个消息中的所有字段,例如,第一个消息中存在的job_type字段不会显示在最终文档中。这可能与json从同一个字段消息两次的事实有关。有没有另外一种方式可以在logstash中将两个日志消息合并成一个文档?

解决方案

您可以使用 聚合 过滤器为了做到这一点。聚合过滤器支持根据公共字段值将多个日志行聚合到一个单个事件中。在你的情况下,公共字段将是 job_id 字段。



然后我们需要另一个字段来检测第一个事件与应该聚合的第二个事件。在你的情况下,这将是状态字段。



所以你只需要添加一个过滤器到你现有的Logstash配置如下:

 过滤器{
...您的其他过滤器

if [state] ==processing{
aggregate {
task_id => %{job_id}
}
} else if [state] ==failed{
aggregate {
task_id => %{job_id}
end_of_task => true
timeout => 120
}
}
}

您可以随意调整超时(以秒为单位),具体取决于您的工作运行时间。


I have set syslog to send logs to logstash, with the following filters:

output {
  elasticsearch 
  { hosts => ["localhost:9200"]
  document_id => "%{job_id}"   
}

}
filter {
    grok {
        overwrite => ["message"]
    }
    json {
     source => "message"
    }
}

A typical message of one of my application will have an initial state and a job_id:

{"job_id": "xyz782", state: "processing", job_type: "something"}

A few minutes or so later, another log will have the same log_id, a different state, and a processing time:

{"job_id": "xyz782", state:"failed", processing_time: 12.345}

These fields get properly loaded, but two documents are created. What I would love is for only one document to be created for the initial log, and the second log to instead update the first one, meaning the updated document would have the following fields:

{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345}

As you can see in my logstash conf output, I use the job_id as the document id, however, the second message seems to replace the fields from the first message, but also erase all the fields in the first message that aren't in the second one, for instance, the job_type field present in the first message doesn't appear in the final document. This may have to do with the fact the json comes from the same field "message" both times. Is there another way to get the merging of two logs messages into one document in logstash?

解决方案

You can use the aggregate filter in order to do this. The aggregate filter provides support for aggregating several log lines into one single event based on a common field value. In your case, the common field would be the job_id field.

Then we need another field to detect the first event vs the second event that should be aggregated. In your case, this would be the state field.

So you simply need to add another filter to your existing Logstash configuration, like this:

filter {
    ...your other filters

    if [state] == "processing" {
        aggregate {
            task_id => "%{job_id}"
        }
    } else if [state] == "failed" {
        aggregate {
            task_id => "%{job_id}"
            end_of_task => true
            timeout => 120
        }
    }
}

You are free to adjust the timeout (in seconds) depending on how long your jobs are running.

这篇关于Logstash:将两个日志合并到一个输出文档中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆