Logstash:将两个日志合并为一个输出文档 [英] Logstash: Merge two logs into one output document

查看:27
本文介绍了Logstash:将两个日志合并为一个输出文档的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已将 syslog 设置为使用以下过滤器将日志发送到 logstash:

I have set syslog to send logs to logstash, with the following filters:

output {
  elasticsearch 
  { hosts => ["localhost:9200"]
  document_id => "%{job_id}"   
}

}
filter {
    grok {
        overwrite => ["message"]
    }
    json {
     source => "message"
    }
}

我的一个应用程序的典型消息将具有初始状态和 job_id:

A typical message of one of my application will have an initial state and a job_id:

{"job_id": "xyz782", state: "processing", job_type: "something"}

大约几分钟后,另一个日志将具有相同的 log_id、不同的状态和处理时间:

A few minutes or so later, another log will have the same log_id, a different state, and a processing time:

{"job_id": "xyz782", state:"failed", processing_time: 12.345}

这些字段被正确加载,但创建了两个文档.我想要的是只为初始日志创建一个文档,而第二个日志来更新第一个日志,这意味着更新后的文档将具有以下字段:

These fields get properly loaded, but two documents are created. What I would love is for only one document to be created for the initial log, and the second log to instead update the first one, meaning the updated document would have the following fields:

{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345}

正如您在我的 logstash conf 输出中看到的,我使用 job_id 作为文档 ID,但是,第二条消息似乎替换了第一条消息中的字段,但也删除了第一条消息中的所有字段例如,在第二个消息中,第一条消息中的 job_type 字段不会出现在最终文档中.这可能与 json 两次都来自同一个字段消息"的事实有关.有没有另一种方法可以将两个日志消息合并到一个 logstash 中的文档中?

As you can see in my logstash conf output, I use the job_id as the document id, however, the second message seems to replace the fields from the first message, but also erase all the fields in the first message that aren't in the second one, for instance, the job_type field present in the first message doesn't appear in the final document. This may have to do with the fact the json comes from the same field "message" both times. Is there another way to get the merging of two logs messages into one document in logstash?

推荐答案

您可以使用 aggregate 过滤器以执行此操作.聚合过滤器支持基于公共字段值将多个日志行聚合为一个事件.在您的情况下,公共字段将是 job_id 字段.

然后我们需要另一个字段来检测第一个事件与应该聚合的第二个事件.在您的情况下,这将是 state 字段.

Then we need another field to detect the first event vs the second event that should be aggregated. In your case, this would be the state field.

因此,您只需向现有的 Logstash 配置添加另一个过滤器,如下所示:

So you simply need to add another filter to your existing Logstash configuration, like this:

filter {
    ...your other filters

    if [state] == "processing" {
        aggregate {
            task_id => "%{job_id}"
        }
    } else if [state] == "failed" {
        aggregate {
            task_id => "%{job_id}"
            end_of_task => true
            timeout => 120
        }
    }
}

您可以根据作业运行的时间自由调整超时(以秒为单位).

You are free to adjust the timeout (in seconds) depending on how long your jobs are running.

这篇关于Logstash:将两个日志合并为一个输出文档的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆