如何在logstash中编写grok模式 [英] How to write grok pattern in logstash

查看:834
本文介绍了如何在logstash中编写grok模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图从logstash开始,我的应用程序有以下类型的日志。这里5表示将再追加5行,这是不同相关事项收集的统计信息。



这些基本上是应用程序统计信息,每一行都表示其中一个资源。 >

有没有办法使用logstash正确解析它,以便它可以用于弹性搜索?

  [20170502 01:57:26.209 EDT(thread-name)package-name.classname#MethodName INFO]一些信息行(5 stats):
[fieldA:strvalue1 | field2:0 | field3:0 | field4:0 | field5:0 | field6:0 | field7:0]
[fieldA:strvalue2 | field2:0 | field3:0 | field4:0 | field5:0 | field6:0 | field7:0]
[fieldA:strvalue3 | field2:0 | field3:0 | field4:0 | field5:0 | field6:0 | field7:0]
[fieldA:strvalue4 | field2:0 | field3:0 | field4:0 | field5:0 | field6:0 | field7:0]
[fieldA:strvalue5 | field2:0 | field3:0 | field4:0 | field5:0 | field6:0 | field7:0]

编辑



这是我正在使用的配置,第一组统计信息被正确解析,但在该管道卡住之后。请注意150个这样的日志,但如果我只保留2-3,那么它工作正常。你可以帮我找出问题吗?

 #[20170513 06:08:29.734 EDT(StatsCollector-1)deshaw.tools .jms.ActiveMQLoggingPlugin $ ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ目标统计(97个目标):
#[destName:topic://darts.metaDataChangeTopic | enqueueCount:1 | dequeueCount:1 | dispatchCount:1 | expiredCount:0 | inflightCount:0 | msgsHeld:0 | msgsCached:0 | memoryPercentUsage:0 | memoryUsage:0 | memoryLimit:536870912 | avgEnqueueTimeMs:0.0 | maxEnqueueTimeMs:0 | minEnqueueTimeMs:0 | currentConsumers:1 |当前制作人:0 | blockedSendsCount:0 | blockedSendsTimeMs:0 | minMsgSize:2392 | maxMsgSize:2392 | avgMsgSize:2392.0 | totalMsgSize:2392]

input {
file {
path => /u/bansalp/activemq_primary_plugin.stats.log.1
###对于相同文件的测试和持续处理,请在生成前删除这些文件
start_position => 开始
sincedb_path => / dev / null
###让我们读取日志文件并重新组合多行详细信息
codec =>多行{
#Grok模式名称有效! :)
pattern => ^ \ [destName:
negate => false
what => 以前
}
}
}

过滤器{
if([message] =〜/ ^ \s * $ /){
drop {}
}
if([message] =〜/ ^ [^ \ [] /){
drop {}
}

if([message] =〜/ logMemoryInfo | logProcessInfo | logSystemInfo | logThreadBreakdown | logBrokerStats /){
drop {}
}
如果[message] =〜logPerDestinationStats{
grok {
match => {message=> ^ \ [%{YEAR:YR}%{MONTHNUM:MNT}%{MONTHDAY:daynum在} \s *%{TIME:时间} \s *%{TZ:时区} \s *(%{ DATA:thread_name})\s *%{JAVACLASS:javaclass}#%{WORD:method} \s *%{LOGLEVEL} \] \s *
}
}
split {
field => message
}
grok {
match => {message=> ^ \%{数据}:\s *%{DATA:了destname} \s * \ | \s *%{数据}:\s *%{NUMBER:enqueueCount} \s * \ | \s *%{数据}:\s *%{NUMBER:dequeueCount} \s * \ | \s *%{数据}:\s *%{NUMBER:dispatchCount} \ S * \ | \s *%{数据}:\s *%{NUMBER:expiredCount} \s * \ | \s *%{数据}:\s *%{NUMBER:inflightCount} \s * \ | \s *%{数据}:\s *%{NUMBER:msgsHeld} \s * \ | \s *%{数据}:\s *%{NUMBER: msgsCached} \s * \ | \s *%{数据}:\s *%{NUMBER:memoryPercentUsage} \s * \ | \s *%{数据}:\s *%{ NUMBER:为MemoryUsage} \s * \ | \s *%{数据}:\s *%{NUMBER:}的memoryLimit * \s \ | \s *%{数据}:\s * %{NUMBER:avgEnqueueTimeMs} \s * \ | \s *%{数据}:\s *%{NUMBER:maxEnqueueTimeMs} \s * \ | \s *%{数据}:\ S *%{NUMBER:minEnqueueTimeMs} \s * \ | \s *%{数据}:\s *%{NUMBER:currentConsumers} \s * \ | \s *%{数据}: \s *%{NUMBER:currentProducers} \s * \ | \s *% {数据}:\s *%{NUMBER:blockedSendsCount} \s * \ | \s *%{数据}:\s *%{NUMBER:blockedSendsTimeMs} \s * \ | \s *%{数据}:\s *%{NUMBER:minMsgSize} \s * \ | \s *%{数据}:\s *%{NUMBER:MAXMSGSIZE} \s * \ | \\ \\s *%{DATA}:\s *%{NUMBER:avgMsgSize} \s * \ | \s *%{DATA}:\s *%{NUMBER:totalMsgSize} \] $}
}
mutate {
convert => {message=> string}
add_field => {
session_timestamp=> %{yr} - %{mnt} - %{daynum}%{time}%{timezone}
load_timestamp=> %{@ timestamp}
}
remove_field => [yr,mnt,daynum,time,timezone]
}
}
}
输出{
stdout { > rubydebug}
}


解决方案

肯定有。



您需要做的是使用多行编解码器输入过滤器。



根据示例:

  input {
file {
path => /var/log/someapp.log
codec =>多行{
#Grok模式名称有效! :)
pattern => ^ \ [%{YEAR}%{MONTHNUM}%{MONTHDAY} \s *%{TIME}
negate => true
what =>以前
}
}
}

这基本上说不能从YYYYMMDD开始的行:mi:ss.000将与上一行



合并,然后可以将Grok模式应用于第一行(获取高级数据)。



一旦你开心,你可以从第一行获得所需的所有数据,然后可以在\r或\\\
和使用单个格局模式获取个人统计数据(根据上面给出的示例)。



希望这有助于



D



更新2017-05-08 11:54:



完整的logstash conf可能看起来像这样,您将需要考虑更改grok模式以更好地满足您的要求(只有您了解您的数据)。



注意,这个hasn没有被测试,我把它留给你。

 输入{
文件{
path = > /var/log/someapp.log
###对于相同文件的测试和持续处理,请在生产前删除这些文件
start_position => 开始
sincedb_path => / dev / null
###让我们读取日志文件并重新组合多行详细信息
codec =>多行{
#Grok模式名称有效! :)
pattern => ^ \ [%{YEAR}%{MONTHNUM}%{MONTHDAY} \s *%{TIME}
negate => true
what =>以前的
}
}
}
过滤器{
###在我们拆分行之前,让我们获得一些高级数据(注意:在拆分之前抓取的任何东西复制)
grok {
match => {message=> ^ \ [%{YEAR:YR}%{MONTHNUM:MNT}%{MONTHDAY:daynum在} \s *%{TIME:时间} \s *%{TZ:时区} \s *(%{ DATA:thread_name})\s *%{JAVACLASS:javaclass}#%{WORD:method} \s *%{LOGLEVEL} \]
}
}
## #现在将线分割成单行。 (这可能是一个\或者\\\
,测试哪一个)
split {
field=> message
terminator=> \r
}
###好的,这行现在应该是独立的,可以在这里添加另一个grok来获取你的例子所规定的模式[fieldA:str | field2:0 ...]等等
###注意:你应该改变grok模式,以更好地适应你的要求,我用DATA在这里快速捕获你的内容
grok {
break_on_match => false
match => {message=> ^ \%{数据}:\s *%{DATA:FIELDA} \ |%{数据}:\s *%{数据:场2} \ |%{数据}:\s * %{数据:场3} \ |%{数据}:\s *%{DATA:字段4} \ |%{数据}:\s *%{DATA:字段5} \ |%{数据}: \s *%{DATA:field6} \ |%{DATA}:\s *%{DATA:field7} \] $}
}
mutate {
convert => {message=> string}
add_field => {
session_timestamp=> %{yr} - %{mnt} - %{daynum}%{time}%{timezone}
load_timestamp=> %{@ timestamp}
}
remove_field => [yr,mnt,daynum,time,timezone]
}
}
输出{
stdout {codec => rubydebug}
}

EDIT 2017-05-15



Logstash是一个复杂的解析器,它期望作为一个进程保持持续监视日志文件(因此您必须将其崩溃)



在比赛中打破意味着你可以对同一行有多个匹配要求,如果没有找到匹配项,将尝试列表中的下一个(总是复杂到简单)



您的输入过滤器,根据您的原始示例更改以.log *结尾的路径,该模式不必与所需的日期格式匹配(为了将所有关联置于单行)



您的过滤器应该指定我相信的分割字符(否则我相信默认是逗号)。

  input {
file {
path => /u/bansalp/activemq_primary_plugin.stats.log*
###对于同一文件的测试和持续处理,请在生成前删除这些文件
start_position => 开始
sincedb_path => / dev / null
###让我们读取日志文件并重新组合多行详细信息
codec =>多行{
#Grok模式名称有效! :)
pattern => ^ \ [destName:
negate => false
what => 以前
}
}
}

过滤器{
如果消息中的logPerDestinationStats{
grok {
match => {message=> ^ \ [%{YEAR:YR}%{MONTHNUM:MNT}%{MONTHDAY:daynum在} \s *%{TIME:时间} \s *%{TZ:时区} \s *(%{ DATA:thread_name})\s *%{JAVACLASS:javaclass}#%{WORD:method} \s *%{LOGLEVEL} \] \s *
}
}
split {
field => message
terminator => \r
}
grok {
match => {message=>^ \ [%{DATA}:\s *%{DATA:destName } \s * \ | \s *%{数据}:\s *%{NUMBER:enqueueCount} \s * \ | \s *%{数据}:\s *%{NUMBER :dequeueCount} \s * \ | \s *%{数据}:\s *%{NUMBER:dispatchCount} \s * \ | \s *%{数据}:\s *% {NUMBER:expiredCount} \s * \ | \s *%{数据}:\s *%{NUMBER:inflightCount} \s * \ | \s *%{数据}:\s *%{NUMBER:msgsHeld} \s * \ | \s *%{数据}:\s *%{NUMBER:msgsCached} \s * \ | \s *%{数据}:\\ \\s *%{NUMBER:memoryPercentUsage} \s * \ | \s *%{数据}:\s *%{NUMBER:为MemoryUsage} \s * \ | \s *%{数据} :\s *%{NUMBER:}的memoryLimit * \s \ | \s *%{数据}:\s *%{NUMBER:avgEnqueueTimeMs} \s * \ | \s *%{数据}:\s *%{NUMBER:maxEnqueueTimeMs} \s * \ | \s *%{数据}:\s *%{NUMBER:minEnqueueTimeMs} \s * \ | \s * %{数据}:\s *%{NUMBER:CURR entConsumers} \s * \ | \s *%{数据}:\s *%{NUMBER:currentProducers} \s * \ | \s *%{数据}:\s *%{编号:blockedSendsCount} \s * \ | \s *%{数据}:\s *%{NUMBER:blockedSendsTimeMs} \s * \ | \s *%{数据}:\s * %{NUMBER:minMsgSize} \s * \ | \s *%{数据}:\s *%{NUMBER:MAXMSGSIZE} \s * \ | \s *%{数据}:\ s *%{NUMBER:avgMsgSize} \s * \ | \s *%{DATA}:\s *%{NUMBER:totalMsgSize} \] $}
}
mutate {
convert => {message=> string}
add_field => {
session_timestamp=> %{yr} - %{mnt} - %{daynum}%{time}%{timezone}
load_timestamp=> %{@ timestamp}
}
remove_field => [yr,mnt,daynum,time,timezone]
}
}
else {
drop {}
}
}

请原谅我正在从手机更新这个格式,我很开心让某人更新我的格式。


I am trying to start with logstash and my application has following type of logs. Here 5 indicate 5 more lines will follow which are stats collected for different related things.

These are basically application stats with each line indicating about one of the resource.

Is there a way to properly parse it using logstash so that it can be use for Elastic search?

[20170502 01:57:26.209 EDT (thread-name) package-name.classname#MethodName INFO] Some info line (5 stats):
[fieldA: strvalue1| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue2| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue3| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue4| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue5| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]

EDIT:

This is the configuration I am using, with this first set of stats are getting parsed properly but after that pipeline get stuck. Please note there 150 such logs but if I keep only 2-3 then it works fine. Can you please help me identifying issue here?

# [20170513 06:08:29.734 EDT (StatsCollector-1) deshaw.tools.jms.ActiveMQLoggingPlugin$ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ Destination Stats (97 destinations):
# [destName: topic://darts.metaDataChangeTopic | enqueueCount: 1 | dequeueCount: 1 | dispatchCount: 1 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 536870912 | avgEnqueueTimeMs: 0.0 | maxEnqueueTimeMs: 0 | minEnqueueTimeMs: 0 | currentConsumers: 1 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 2392 | maxMsgSize: 2392 | avgMsgSize: 2392.0 | totalMsgSize: 2392]

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log.1"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[destName:"
      negate => false
      what => "previous"
    }
  }
}

filter {
    if ([message] =~ /^\s*$/ ){
        drop{}
    }
    if ([message] =~ /^[^\[]/) {
            drop{}
    }

    if ([message] =~ /logMemoryInfo|logProcessInfo|logSystemInfo|logThreadBreakdown|logBrokerStats/) {
            drop{}
    }
    if [message] =~ "logPerDestinationStats" {
        grok {
                match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                }
        }
        split { 
            field => "message"
        }
        grok {
                match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
}
output {
  stdout {codec => rubydebug}
}

解决方案

There certainly is.

What you will need to do is utilise the multiline codec on your input filter.

As per the example:

input {
  file {
    path => "/var/log/someapp.log"
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
      negate => true
      what => previous
    }
  }
}

This basically states that any line that doesnt start with the YYYYMMDD HH:mi:ss.000 will merge with the previous line

From there you can now apply Grok patterns to the first line (to get high level data).

Once you're happy you have all the data you require from the first line, you can then split on \r or \n and get individual stats data using a single grok pattern (based on the examples you gave above).

Hope this helps

D

Update 2017-05-08 11:54:

Full logstash conf could possibly look like this, you will need to consider changing the grok patterns to better suit your requirements (only you know your data).

Note, this hasn't been tested, I leave that up to you.

input {
  file {
    path => "/var/log/someapp.log"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
      negate => true
      what => previous
    }
  }
}
filter {
### Let's get some high level data before we split the line (note: anything you grab before the split gets copied)
    grok {
        match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]"
        }
    }
### Split the lines back out to being a single line now. (this may be a \r or \n, test which one)
    split { 
        "field" => "message"
        "terminator" => "\r" 
    }
### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc.
### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content
    grok {
        break_on_match => false
        match => { "message" => "^\[%{DATA}:\s*%{DATA:fieldA}\|%{DATA}:\s*%{DATA:field2}\|%{DATA}:\s*%{DATA:field3}\|%{DATA}:\s*%{DATA:field4}\|%{DATA}:\s*%{DATA:field5}\|%{DATA}:\s*%{DATA:field6}\|%{DATA}:\s*%{DATA:field7}\]$" }
    }
    mutate {
    convert => { "message" => "string" }
        add_field => {
            "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
            "load_timestamp" => "%{@timestamp}"
        }
        remove_field => ["yr","mnt", "daynum", "time", "timezone"]
    }
}
output {
  stdout { codec => rubydebug }
}

EDIT 2017-05-15

Logstash is a complex parser, it expects to stay up as a process and continuously monitor the log files (hence why you have to crash it out)

Break on match would mean you could have multiple match requirements for the same line, if it didn't find a match it would try the next in the list (always go complex to simple)

Your input filter, change the path to end with .log*, also, as per your original example, does the pattern not have to be matched to the date format required (in order to bring all associations onto a single line)

Your filter should be specifying what the split character is I believe (otherwise I believe the default is a comma).

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log*"
### For testing and continual process of the same file, remove these before production
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[destName:"
      negate => false
      what => "previous"
    }
  }
}

filter {
    if "logPerDestinationStats" in [message] {
        grok {
                match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                }
        }
        split { 
            field => "message"
            terminator => "\r"
            }
        grok {
                match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
   else {
      drop{}
    }
}

Please excuse the formatting I'm currently updating this from a mobile, I am happy for someone to update the formatting in my stead.

这篇关于如何在logstash中编写grok模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆