logstash Apache日志的自定义日志过滤器 [英] logstash Custom Log Filter for Apache Logs

查看:517
本文介绍了logstash Apache日志的自定义日志过滤器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是ELK堆栈的新手.我有一个文件拍服务,将日志发送到logstash,并在logstash中使用grok过滤器,将数据推送到elasticsearch索引.

i am new to the ELK stack. I have a filebeat service sending logs to logstash, and in logstash using a grok filter, the data is pushed to an elasticsearch index.

我正在将gork过滤器与match => { "message" => "%{COMBINEDAPACHELOG}"}一起使用以解析数据.

I am using the gork filter with match => { "message" => "%{COMBINEDAPACHELOG}"} to parse the data.

我的问题是,我希望将字段名称及其值存储在elasticsearch索引中.我的日志的不同版本如下:

My Issue is, I want the names of the fields and their values to be stored in the elasticsearch index. My different versions of the logs are as below:

27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atm&explain=true&bridge=true HTTP/1.1" 200 3284
27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atms&explain=true&bridge=true HTTP/1.1" 200 1452
27.60.18.21 - - [27/Aug/2017:10:28:52 +0530] "GET /api/v1.2/places/nearby/json?&refLocation=28.5359586,77.3677936&keyword=FINATM HTTP/1.1" 200 3283
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=co&explain=true&bridge=true HTTP/1.1" 200 3415
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=cof&explain=true&bridge HTTP/1.1" 200 2476


我要在弹性索引中输入的字段如下:


The fields that I want in the elastic index are below:

  1. client_ip =>类型必须与kibana用于IP映射的类型兼容.
  2. timestamp =>日期时间格式. =>日志的时间
  3. 方法=>文本=>所谓的方法GET,POST
  4. 版本=>十进制数字=>例如1.2/1.0(在示例日志中为v1.2)
  5. 用户名=>文本=>是username=之后的文本(在示例日志中为pradeep.pgu)
  6. location => geo_point type =>该值同时具有经度和纬度,以便kibana可以在地图上绘制它们.
  7. search_query => text =>被搜索的内容(在示例中,来自两个字段"keyword =或" query =).两个字段中的任何一个都将存在,而一个字段将存在,必须使用它的值.
  8. 响应代码=>数字=>响应代码. (示例中为200)
  9. data_transfered => number =>传输的数据量(样本中的最后一个数字).
  1. client_ip => type must be compatible to what kibana uses for IP mapping.
  2. timestamp => datetime format. => the time the of the log
  3. method => text => the method that was called e.g. GET,POST
  4. version => decimal number => e.g. 1.2 / 1.0 (in the sample logs as v1.2)
  5. username => text => it's the text after the username= (in the sample log as pradeep.pgu)
  6. location =>geo_point type => the value has both latitude and longitude so that kibana can plot these on the map.
  7. search_query => text => the thing that was searched (in the sample from either of the two fields "keyword=" or "query="). Either of the two fields would be present and the one that is present, it's value must be used.
  8. response_code => number => the code of the response. (in the sample as 200)
  9. data_transfered => number => the amount of data transferred (the last number in the sample).

这样的事情有可能吗? gork过滤器是否对此有规定?问题是参数不是特定于订单的.

Is such a thing even possible? Does the gork filter has a provision for this? The thing is the parameters are not order specific.

推荐答案

HTTPD_COMMONLOG开始,您可以使用此模式(可以在

Starting from the HTTPD_COMMONLOG, you could use this pattern (which you can test at grok tester):

grok {
 match => { 
  "message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/places/search/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
 } 
}

一旦grok过滤器提取了请求,就可以在其上使用kv过滤器,该过滤器将提取参数(并忽略参数不特定于订单的问题).您必须将field_split选项设置为&:

Once the grok filter have extracted the request, you can use the kv filter on it, which will extract the parameters (and ignore the problem of the parameters not being order specific). You'll have to put the field_split option to &:

kv { 
  source => "request"
  field_split => "&"
}

对于search_query,根据出现的字段,我们将mutate过滤器与add_field选项一起使用来创建该字段.

For search_query, depending on which field is present, we use the mutate filter with the add_field option to create the field.

filter {
    grok {
        match => { 
            "message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/.*/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
        } 
    }
    kv { 
        source => "request"
        field_split => "&"
    }

    if [query] {
        mutate {
            add_field => { "search_query" => "%{query}" }
        }
    } else if [keyword] {
        mutate {
            add_field => { "search_query" => "%{keyword}" }
        }
    }

    if [refLocation] {
        mutate {
            rename => { "refLocation" => "location" }
        }
    }
}

这篇关于logstash Apache日志的自定义日志过滤器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆