Logstash:将复制的多行JSON从日志文件解析为ElasticSearch [英] Logstash: Parse Complicated Multiline JSON from log file into ElasticSearch
问题描述
我正在寻找示例日志条目,让Logstash读取它并将JSON作为JSON发送到ElasticSearch。
以下是(缩略)示例:
0m [0m16:02:08,685 INFO [org.jboss.as.server](ServerService Thread Pool - 28)JBAS018559:{
appName:SomeApp,
freeMemReqStartBytes:544577648,
freeMemReqEndBytes:513355408,
totalMem:839385088,
maxMem:1864368128,
anonymousUser:false,
sessionId:zz90g0dFQkACVao4ZZL34uAb
swAction:{
clock:0,
clockStart:1437766438950,
name:General,
trackingMemory :
trackingMemoryGcFirst:true,
memLast:0,
memOrig:0
},
remoteHost:127.0.0.1
remoteAddr:127.0.0.1,
requestMethod:GET,
mapLocalObjectCount:{
FinanceEmployee:{
x:1,
singleton:false
},
QuoteProcessPolicyRef:{
x:10,
singleton b $ b},
LocationRef:{
x:2,
singleton:false
}
},
theSqlStats:{
lstStat:[
{
sql:select * FROM DUAL,
truncated:false,
truncatedSize:-1,
recordCount:1,
foundInCache:false,
putInCache:false,
isUpdate:false,
sqlFrom:DUAL,
usingPreparedStatement:true,
isLoad:false,
sw:{
clock:104,
clockStart:1437766438970,
name:General,
trackingMemory:false,
trackingMemoryGcFirst:true,
memLast:0,
memOrig:0
},
count :
sql:select * FROM DUAL2,
truncated:false,
truncatedSize:-1,
recordCount:0,
foundInCache:false,
putInC疼痛:false,
isUpdate:false,
sqlFrom:DUAL2,
usingPreparedStatement:true,
isLoad:false,
sw:{
clock:93,
clockStart:1437766439111,
name:General,
trackingMemory:false,
trackingMemoryGcFirst:true,
memLast:0,
memOrig:0
},
count:0
}
]
}
}
Logstash配置我尝试没有工作。到目前为止最接近的是:
input {
file {
codec =>多行{
pattern => '\ {(。*)\}'
negate => true
what =>以前的
}
path => ['/var/log/logstash.log']
start_position => 开始
sincedb_path => / dev / null
}
}
过滤器{
json {
source =>消息
}
}
输出{
stdout {codec => rubydebug}
elasticsearch {
cluster => logstash
index => logstashjson
}
}
我也试过:
input {
file {
type => json
path => /var/log/logstash.log
codec => json #also尝试json_lines
}
}
过滤器{
json {
source => message
}
}
output {
stdout {codec => rubydebug}
elasticsearch {
cluster => logstash
codec => json#also尝试json_lines
index => logstashjson
}
}
我只想把JSON发布并将其按原样发送到ElasticSearch,就像我对该文件进行了一个cURL PUT一样。感谢任何帮助,谢谢!
更新
,这里是我现在的配置:
input {
file {
codec =>多行{
pattern => ^ \ [
negate => true
what =>以前的
}
path => ['/var/log/logstash.log']
start_position => 开始
sincedb_path => / dev / null
}
}
过滤器{
grok {
match => {message=> ^(?<垃圾>。*?)(?< logged_json> {。*)}
}
json {
source => logged_json
target => parsed_json
}
}
输出{
stdout {
codec => rubydebug
}
elasticsearch {
cluster => logstash
index => logstashjson
}
}
p> 对不起,我还没有发表评论,所以会发表一个答案。您在 elaticsearch
配置中缺少 document_type
,如何以其他方式推导?
在查看logstash引用并与@Ascalonian密切合作后,我们提出了以下配置:
输入{
$ b#需要正确配置多行编解码器。
#你需要匹配在开始时有时间戳的行,
#,然后说不是这行的所有东西都应该去上一行。
#当json数组从行的第一个
#char开始时,模式可能会被改进来处理,但现在就足够了
codec =>多行{
pattern => ^ \ [
negate => true
what =>以前的
max_lines => 2000
}
path => ['/var/log/logstash.log']
start_position => 开始
sincedb_path => / dev / null
}
}
过滤器{
#将消息字符串的json部分解压缩到一个单独的字段
grok {
match => {message=> @。*?(?< logged_json> {。*)}
}
#替换json字符串中的换行符,因为json过滤器低于
#与那些。也是时候删除不需要的字段
mutate {
gsub => ['logged_json','\\\
','']
remove_field => [message,@timestamp,host,path,@version,tags]
}
#解析json并删除字符串字段成功
json {
source => logged_json
remove_field => [logged_json]
}
}
输出{
stdout {
codec => rubydebug
}
elasticsearch {
cluster => logstash
index => logstashjson
}
}
Let me first say that I have gone through as many examples on here as I could that still do not work. I am not sure if it's because of the complicated nature of the JSON in the log file or not.
I am looking to take the example log entry, have Logstash read it in, and send the JSON as JSON to ElasticSearch.
Here is what the (shortened) example looks:
[0m[0m16:02:08,685 INFO [org.jboss.as.server] (ServerService Thread Pool -- 28) JBAS018559: {
"appName": "SomeApp",
"freeMemReqStartBytes": 544577648,
"freeMemReqEndBytes": 513355408,
"totalMem": 839385088,
"maxMem": 1864368128,
"anonymousUser": false,
"sessionId": "zz90g0dFQkACVao4ZZL34uAb",
"swAction": {
"clock": 0,
"clockStart": 1437766438950,
"name": "General",
"trackingMemory": false,
"trackingMemoryGcFirst": true,
"memLast": 0,
"memOrig": 0
},
"remoteHost": "127.0.0.1",
"remoteAddr": "127.0.0.1",
"requestMethod": "GET",
"mapLocalObjectCount": {
"FinanceEmployee": {
"x": 1,
"singleton": false
},
"QuoteProcessPolicyRef": {
"x": 10,
"singleton": false
},
"LocationRef": {
"x": 2,
"singleton": false
}
},
"theSqlStats": {
"lstStat": [
{
"sql": "select * FROM DUAL",
"truncated": false,
"truncatedSize": -1,
"recordCount": 1,
"foundInCache": false,
"putInCache": false,
"isUpdate": false,
"sqlFrom": "DUAL",
"usingPreparedStatement": true,
"isLoad": false,
"sw": {
"clock": 104,
"clockStart": 1437766438970,
"name": "General",
"trackingMemory": false,
"trackingMemoryGcFirst": true,
"memLast": 0,
"memOrig": 0
},
"count": 0
},
{
"sql": "select * FROM DUAL2",
"truncated": false,
"truncatedSize": -1,
"recordCount": 0,
"foundInCache": false,
"putInCache": false,
"isUpdate": false,
"sqlFrom": "DUAL2",
"usingPreparedStatement": true,
"isLoad": false,
"sw": {
"clock": 93,
"clockStart": 1437766439111,
"name": "General",
"trackingMemory": false,
"trackingMemoryGcFirst": true,
"memLast": 0,
"memOrig": 0
},
"count": 0
}
]
}
}
The Logstash configs I have tried have not worked. The one closest so far is:
input {
file {
codec => multiline {
pattern => '\{(.*)\}'
negate => true
what => previous
}
path => [ '/var/log/logstash.log' ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => message
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
cluster => "logstash"
index => "logstashjson"
}
}
I have also tried:
input {
file {
type => "json"
path => "/var/log/logstash.log"
codec => json #also tried json_lines
}
}
filter {
json {
source => "message"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
cluster => "logstash"
codec => "json" #also tried json_lines
index => "logstashjson"
}
}
I just want to take the JSON posted above and send it "as is" to ElasticSearch just as if I did a cURL PUT with that file. I appreciate any help, thank you!
UPDATE
After help from Leonid, here is the configuration I have right now:
input {
file {
codec => multiline {
pattern => "^\["
negate => true
what => previous
}
path => [ '/var/log/logstash.log' ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "^(?<rubbish>.*?)(?<logged_json>{.*)" }
}
json {
source => "logged_json"
target => "parsed_json"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
cluster => "logstash"
index => "logstashjson"
}
}
Sorry, I can't yet make comments, so will post an answer. You are missing a document_type
in the elaticsearch
config, how would it be otherwise deduced?
All right, after looking into the logstash reference and working closely with @Ascalonian we came up with the following config:
input {
file {
# in the input you need to properly configure the multiline codec.
# You need to match the line that has the timestamp at the start,
# and then say 'everything that is NOT this line should go to the previous line'.
# the pattern may be improved to handle case when json array starts at the first
# char of the line, but it is sufficient currently
codec => multiline {
pattern => "^\["
negate => true
what => previous
max_lines => 2000
}
path => [ '/var/log/logstash.log']
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
# extract the json part of the message string into a separate field
grok {
match => { "message" => "^.*?(?<logged_json>{.*)" }
}
# replace newlines in the json string since the json filter below
# can not deal with those. Also it is time to delete unwanted fields
mutate {
gsub => [ 'logged_json', '\n', '' ]
remove_field => [ "message", "@timestamp", "host", "path", "@version", "tags"]
}
# parse the json and remove the string field upon success
json {
source => "logged_json"
remove_field => [ "logged_json" ]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
cluster => "logstash"
index => "logstashjson"
}
}
这篇关于Logstash:将复制的多行JSON从日志文件解析为ElasticSearch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!