Logstash筛选器:聚合-超时自动保存 [英] Logstash Filter: aggregate - auto save on timeout
问题描述
我在AWS中有一个Lambda函数,该函数将日志报告给ELK实例. lambda函数的每次调用都会生成一个唯一的invocation_id
,该invocation_id
与每个日志事件一起发送,因此可以在ELK中标识来自单个调用的事件.在操作结束时,我发送了一个完成"事件.
I have a Lambda function in AWS which reports logs to an ELK instance. Each invocation of the lambda function generates a unique invocation_id
that is sent with every log event, so the events from a single invocation can be identified in ELK. At the end of the operation, I send a "Done" event.
Lambda函数可能会失败或超时,然后不会发送完成"事件.
A Lambda function can fail, or timeout, and then the "Done" event is not sent.
我想使用 logstash聚合过滤器识别失败的调用.含义-每个invocation_id
将是聚合图中的task_id
,而"Done"事件将是end_of_task
.
I want to use the logstash aggregate filter to identify the failed invocations. Meaning - each invocation_id
will be a task_id
in the aggregation map, and the "Done" event will be the end_of_task
.
我需要告诉它在超时(X时间之后没有收到完成的事件)中,保存状态为失败"的汇总事件".
And I need to tell it "on timeout (there was no done event received after X time) save the aggregated event with status=failed".
使用此过滤器可以吗?如果是这样,语法是什么?从文档中还不清楚..
Is that possible with this filter? If so, what is the syntax? It's not clear from the docs..
推荐答案
Logstash聚合筛选器自2.3.0版本开始支持超时事件的生成.这是使用此功能实现所需功能的方法:
Logstash aggregate filter supports timeout event generation since version 2.3.0. Here is how to achieve what you want using this feature:
if [action] == "BEGIN" {
aggregate {
task_id => "%{id}"
code => "map['bytes'] = 0"
map_action => "create"
}
} elseif [action] == "DONE" {
aggregate {
task_id => "%{id}"
code => "event['bytes'] += map['bytes']"
timeout_code => "event.tag('failed')"
map_action => "update"
end_of_task => true
timeout => 10
push_map_as_event_on_timeout => true
} else {
aggregate {
task_id => "%{id}"
code => "map['bytes'] += event['bytes']"
map_action => "update"
add_tag => [ "drop" ]
}
这篇关于Logstash筛选器:聚合-超时自动保存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!