Logstash:将复制的多行JSON从日志文件解析为ElasticSearch [英] Logstash: Parse Complicated Multiline JSON from log file into ElasticSearch

查看:183
本文介绍了Logstash:将复制的多行JSON从日志文件解析为ElasticSearch的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我先说说,我已经经历了很多这方面的例子,尽管如此,这些例子还是不行。我不知道是不是因为日志文件中JSON的复杂性质。



我正在寻找示例日志条目,让Logstash读取它并将JSON作为JSON发送到ElasticSearch。



以下是(缩略)示例:

  0m [0m16:02:08,685 INFO [org.jboss.as.server](ServerService Thread Pool  -  28)JBAS018559:{
appName:SomeApp,
freeMemReqStartBytes:544577648,
freeMemReqEndBytes:513355408,
totalMem:839385088,
maxMem:1864368128,
anonymousUser:false,
sessionId:zz90g0dFQkACVao4ZZL34uAb
swAction:{
clock:0,
clockStart:1437766438950,
name:General,
trackingMemory :
trackingMemoryGcFirst:true,
memLast:0,
memOrig:0
},
remoteHost:127.0.0.1
remoteAddr:127.0.0.1,
requestMethod:GET,
mapLocalObjectCount:{
FinanceEmployee:{
x:1,
singleton:false
},
QuoteProcessPolicyRef:{
x:10,
singleton b $ b},
LocationRef:{
x:2,
singleton:false
}
},
theSqlStats:{
lstStat:[
{
sql:select * FROM DUAL,
truncated:false,
truncatedSize:-1,
recordCount:1,
foundInCache:false,
putInCache:false,
isUpdate:false,
sqlFrom:DUAL,
usingPreparedStatement:true,
isLoad:false,
sw:{
clock:104,
clockStart:1437766438970,
name:General,
trackingMemory:false,
trackingMemoryGcFirst:true,
memLast:0,
memOrig:0
},
count :


sql:select * FROM DUAL2,
truncated:false,
truncatedSize:-1,
recordCount:0,
foundInCache:false,
putInC疼痛:false,
isUpdate:false,
sqlFrom:DUAL2,
usingPreparedStatement:true,
isLoad:false,
sw:{
clock:93,
clockStart:1437766439111,
name:General,
trackingMemory:false,
trackingMemoryGcFirst:true,
memLast:0,
memOrig:0
},
count:0
}
]
}
}

Logstash配置我尝试没有工作。到目前为止最接近的是:

  input {
file {
codec =>多行{
pattern => '\ {(。*)\}'
negate => true
what =>以前的
}
path => ['/var/log/logstash.log']
start_position => 开始
sincedb_path => / dev / null
}
}

过滤器{
json {
source =>消息
}
}

输出{
stdout {codec => rubydebug}
elasticsearch {
cluster => logstash
index => logstashjson
}
}

我也试过:

  input {
file {
type => json
path => /var/log/logstash.log
codec => json #also尝试json_lines
}
}

过滤器{
json {
source => message
}
}

output {
stdout {codec => rubydebug}
elasticsearch {
cluster => logstash
codec => json#also尝试json_lines
index => logstashjson
}
}

我只想把JSON发布并将其按原样发送到ElasticSearch,就像我对该文件进行了一个cURL PUT一样。感谢任何帮助,谢谢!



更新



,这里是我现在的配置:

  input {
file {
codec =>多行{
pattern => ^ \ [
negate => true
what =>以前的
}
path => ['/var/log/logstash.log']
start_position => 开始
sincedb_path => / dev / null
}
}

过滤器{
grok {
match => {message=> ^(?<垃圾>。*?)(?< logged_json> {。*)}
}
json {
source => logged_json
target => parsed_json
}
}

输出{
stdout {
codec => rubydebug
}
elasticsearch {
cluster => logstash
index => logstashjson
}
}


解决方案

p> 对不起,我还没有发表评论,所以会发表一个答案。您在 elaticsearch 配置中缺少 document_type ,如何以其他方式推导?






在查看logstash引用并与@Ascalonian密切合作后,我们提出了以下配置:

 输入{


$ b#需要正确配置多行编解码器。
#你需要匹配在开始时有时间戳的行,
#,然后说不是这行的所有东西都应该去上一行。
#当json数组从行的第一个
#char开始时,模式可能会被改进来处理,但现在就足够了

codec =>多行{
pattern => ^ \ [
negate => true
what =>以前的
max_lines => 2000
}

path => ['/var/log/logstash.log']
start_position => 开始
sincedb_path => / dev / null
}
}

过滤器{

#将消息字符串的json部分解压缩到一个单独的字段
grok {
match => {message=> @。*?(?< logged_json> {。*)}
}

#替换json字符串中的换行符,因为json过滤器低于
#与那些。也是时候删除不需要的字段
mutate {
gsub => ['logged_json','\\\
','']
remove_field => [message,@timestamp,host,path,@version,tags]
}

#解析json并删除字符串字段成功
json {
source => logged_json
remove_field => [logged_json]
}
}

输出{
stdout {
codec => rubydebug
}
elasticsearch {
cluster => logstash
index => logstashjson
}
}


Let me first say that I have gone through as many examples on here as I could that still do not work. I am not sure if it's because of the complicated nature of the JSON in the log file or not.

I am looking to take the example log entry, have Logstash read it in, and send the JSON as JSON to ElasticSearch.

Here is what the (shortened) example looks:

[0m[0m16:02:08,685 INFO  [org.jboss.as.server] (ServerService Thread Pool -- 28) JBAS018559: {
"appName": "SomeApp",
"freeMemReqStartBytes": 544577648,
"freeMemReqEndBytes": 513355408,
"totalMem": 839385088,
"maxMem": 1864368128,
"anonymousUser": false,
"sessionId": "zz90g0dFQkACVao4ZZL34uAb",
"swAction": {
    "clock": 0,
    "clockStart": 1437766438950,
    "name": "General",
    "trackingMemory": false,
    "trackingMemoryGcFirst": true,
    "memLast": 0,
    "memOrig": 0
},
"remoteHost": "127.0.0.1",
"remoteAddr": "127.0.0.1",
"requestMethod": "GET",
"mapLocalObjectCount": {
    "FinanceEmployee": {
      "x": 1,
      "singleton": false
    },
    "QuoteProcessPolicyRef": {
      "x": 10,
      "singleton": false
    },
    "LocationRef": {
      "x": 2,
      "singleton": false
    }
},
"theSqlStats": {
    "lstStat": [
      {
        "sql": "select * FROM DUAL",
        "truncated": false,
        "truncatedSize": -1,
        "recordCount": 1,
        "foundInCache": false,
        "putInCache": false,
        "isUpdate": false,
        "sqlFrom": "DUAL",
        "usingPreparedStatement": true,
        "isLoad": false,
        "sw": {
          "clock": 104,
          "clockStart": 1437766438970,
          "name": "General",
          "trackingMemory": false,
          "trackingMemoryGcFirst": true,
          "memLast": 0,
          "memOrig": 0
        },
        "count": 0
      },
      {
        "sql": "select * FROM DUAL2",
        "truncated": false,
        "truncatedSize": -1,
        "recordCount": 0,
        "foundInCache": false,
        "putInCache": false,
        "isUpdate": false,
        "sqlFrom": "DUAL2",
        "usingPreparedStatement": true,
        "isLoad": false,
        "sw": {
          "clock": 93,
          "clockStart": 1437766439111,
          "name": "General",
          "trackingMemory": false,
          "trackingMemoryGcFirst": true,
          "memLast": 0,
          "memOrig": 0
        },
        "count": 0
      }
    ]
    }
}

The Logstash configs I have tried have not worked. The one closest so far is:

input {
    file {
        codec => multiline {
            pattern => '\{(.*)\}'
            negate => true
            what => previous
        }
        path => [ '/var/log/logstash.log' ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    json {
        source => message
    }
}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        cluster => "logstash"
        index => "logstashjson"
    }
}

I have also tried:

input {
    file {
        type => "json"
        path => "/var/log/logstash.log"
        codec => json #also tried json_lines
    }
}

filter {
    json {
        source => "message"
    }
}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        cluster => "logstash"
        codec => "json" #also tried json_lines
        index => "logstashjson"
    }
}

I just want to take the JSON posted above and send it "as is" to ElasticSearch just as if I did a cURL PUT with that file. I appreciate any help, thank you!

UPDATE

After help from Leonid, here is the configuration I have right now:

input {
    file {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => previous
        }
        path => [ '/var/log/logstash.log' ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    grok {
        match => { "message" => "^(?<rubbish>.*?)(?<logged_json>{.*)" }
    }
    json {
        source => "logged_json"
        target => "parsed_json"
    }
}

output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        cluster => "logstash"
        index => "logstashjson"
    }
}

解决方案

Sorry, I can't yet make comments, so will post an answer. You are missing a document_type in the elaticsearch config, how would it be otherwise deduced?


All right, after looking into the logstash reference and working closely with @Ascalonian we came up with the following config:

input { 
    file { 

        # in the input you need to properly configure the multiline codec.
        # You need to match the line that has the timestamp at the start, 
        # and then say 'everything that is NOT this line should go to the previous line'.
        # the pattern may be improved to handle case when json array starts at the first 
        # char of the line, but it is sufficient currently

        codec => multiline { 
            pattern => "^\[" 
            negate => true 
            what => previous 
            max_lines => 2000 
        } 

        path => [ '/var/log/logstash.log'] 
        start_position => "beginning" 
        sincedb_path => "/dev/null" 
    } 
} 

filter { 

    # extract the json part of the message string into a separate field
    grok { 
        match => { "message" => "^.*?(?<logged_json>{.*)" } 
    } 

    # replace newlines in the json string since the json filter below
    # can not deal with those. Also it is time to delete unwanted fields
    mutate { 
        gsub => [ 'logged_json', '\n', '' ] 
        remove_field => [ "message", "@timestamp", "host", "path", "@version", "tags"] 
    } 

    # parse the json and remove the string field upon success
    json { 
        source => "logged_json" 
        remove_field => [ "logged_json" ] 
    } 
} 

output { 
    stdout { 
        codec => rubydebug 
    } 
    elasticsearch { 
        cluster => "logstash" 
        index => "logstashjson" 
    } 
}

这篇关于Logstash:将复制的多行JSON从日志文件解析为ElasticSearch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆