Logstash:使用静态csv文件中的数据丰富日志文件中的事件 [英] Logstash: Enrich event from log file with data from static csv file

查看:122
本文介绍了Logstash:使用静态csv文件中的数据丰富日志文件中的事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

摘要: 在Logstash中使用过滤器,该过滤器将从事件字段中读取值,在外部文件(例如csv)中查找该值,并从匹配的外部文件中检索值.使用外部文件中的值在事件中添加为额外字段.

Summary: Use a filter in logstash that will read a value from event field, lookup this value in external file (e.g. csv) and retrieve value from external file that match. Use value from external file to add as an extra field in the event.

更多信息: 我有一个带有事件的日志文件.事件看起来像:

More info: I have a log file with events. The events look like:

{"@timestamp":"2014-06-18T11:52:45.370636+02:00","location":{"MainId":3,"SubId":"5"},"EndRequest":{"Duration":{"Main":0,"Page":6720}}}

我有一个静态的csv文件,例如:

I have a static csv file like:

1,left
2,right
3,top

当在logstash中处理事件时,我希望能够使用一个过滤器,该过滤器将检查MainId的值(例如event = 3)并在csv文件中找到该值.如果找到该事件,则该事件必须获得一个标签:"top".

When an event is processed in logstash I want to be able to use a filter which will check value of MainId (in example event = 3) and find this value in the csv file. If found then the event must get a tag: "top".

有点类似于过滤器"GeoIP".该事件具有字段值,数据库"中的匹配值和返回的值,这些值可以添加到事件中.

It's kind of a similar way of the filter "GeoIP". The event has a field value, match value in a "database" and return values back which can be added to the event.

我无法找到可以用于上述过程的当前过滤器.我需要自己制作一个自定义过滤器吗?如果可以的话,有人可以提示如何处理吗?

I was not able to find a current filter that could to above process. Do I need to make a custom filter myself? If so can some one give a hint how to approach this?

推荐答案

我从未见过为此写过插件,所以我继续写了一个非常基本的插件:

I've never seen a plugin written for it, so I went ahead and wrote a very basic one:

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "csv"

# The cvslookup filter allows you to add fields to an event
# base on a csv file

class LogStash::Filters::CSVLookup < LogStash::Filters::Base
  config_name "csvlookup"
  milestone 1

  # Example:
  #
  #     filter {
  #       csvlookup {
  #     file => 'key_value.csv'
  #     key_col => 1
  #     value_col => 2
  #     default => 'some_value'
  #         map_field => { "from_field" => "to_field" }
  #       }
  #     }
  # 
  # the default is used if the key_col's value is not present in the CSV file

  config :file, :validate => :string, :required => true
  config :key_col, :validate => :number, :default => 1, :required => false
  config :value_col, :validate => :number, :default => 2, :required => false
  config :default, :validate => :string, :required => false
  config :map_field, :validate => :hash, :required => true

  public
  def register
    @lookup = Hash.new

    CSV.foreach(@file) do |row|
      @lookup[row[@key_col - 1]] = row[@value_col - 1]
    end
    #puts @lookup.inspect
  end # def register

  public
  def filter(event)
    return unless filter?(event)

    @map_field.each do |src_field,dest_field|
      looked_up_val = @lookup[event[src_field].to_s]
      if looked_up_val.nil?
          if !@default.nil?
            event[dest_field] = @default
          end
      else
        if event[dest_field].nil?
          event[dest_field] = looked_up_val
        elsif !event[dest_field].is_a?(Array)
          event[dest_field] = [ event[dest_field], looked_up_val ]
        else
          event[dest_field].push(looked_up_val)
        end
      end
    end 
  end # def filter
end # class LogStash::Filters::CSVLookup

还可以做进一步的工作-例如,如果src_field是一个数组,则可以对其进行迭代,但是它应该按照您的情况工作.

There is further work that could be done on it -- for example if the src_field was an array, it could iterate over it, but it should work as is for your case.

这篇关于Logstash:使用静态csv文件中的数据丰富日志文件中的事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆