使用字段作为Logstash Grok过滤器模式的输入 [英] Using field as input to Logstash Grok filter pattern

查看:113
本文介绍了使用字段作为Logstash Grok过滤器模式的输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否可以使用Logstash消息中的字段作为Grok模式的输入.假设我有一个看起来像这样的条目:

I'm wondering if it is possible to use a field in the Logstash message as the input the to Grok pattern. Say I have an entry that looks like:

{
    "message":"10.1.1.1",
    "grok_filter":"%{IP:client}"
}

我希望能够做这样的事情:

I want to be able to do something like this:

filter {
  grok {
    match => ["message", ["%{grok_filter}"]]
  }
}

问题是这会使Logstash崩溃,因为它似乎将%{grok_filter}"视为Grok过滤器本身而不是grok_filter的值. Logstash崩溃后,我得到以下信息:

The problem is this crashes Logstash as it appears to treat "%{grok_filter}" as the Grok filter itself instead of the value of grok_filter. I get the following after Logstash has crashed:

The error reported is: 
  pattern %{grok_filter} not defined

反正有没有从Grok过滤器块内部获取字段的值并将其用作Grok模式的输入?

Is there anyway to get the value of a field from inside the Grok filter block and use that as the input to the Grok pattern?

推荐答案

答案是否定的-初始化过滤器时,grok过滤器将编译其模式.如果您需要执行类似的操作,则必须编写自己的过滤器,该过滤器每次都会编译该模式(并付出性能损失).

The answer is no -- the grok filter compiles its pattern when the filter is initialized. If you need to do something like that you'll have to write your own filter that compiles the pattern every time (and pay the performance penalty).

在不了解您为什么要执行此操作的情况下,很难推荐最佳的操作方法.如果模式数量有限,则可以只设置一个grok_filter_type参数,然后使用一堆if [grok_filter_type] == 'ip' { grok { ... } }类型的东西.

Without knowing more about why you want to do this, it's hard to recommend the best course of action. If you have a limited number of patterns, you can just set a grok_filter_type parameter and then have a bunch of if [grok_filter_type] == 'ip' { grok { ... } } type of things.

这是一个自定义过滤器,可让您执行所需的操作-主要是grok代码的副本,但有一些更改/简化.我已经对其进行了测试,它似乎很适合我.

Here's a custom filter that will allow you to do what you want -- it's mostly a copy of the grok code, but there are some changes/simplifications. I've tested it and it seems to work for me.

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/environment"
require "set"

# A version of grok that can parse from a log-defined pattern.  Not really
# recommended for high usage patterns, but for the occassional pattern it
# should work
#     filter {
#       grok_dynamic {
#         match_field => "message"
#     pattern_field => "message_pattern"
#       }
#     }
#
class LogStash::Filters::GrokDynamic < LogStash::Filters::Base
  config_name "grok_dynamic"
  milestone 1

  # The field that contains the data to match against
  config :match_field, :validate => :string, :required => true
  # the field that contains the pattern
  config :pattern_field, :validate => :string, :required => true
  # where the patterns are
  config :patterns_dir, :validate => :array, :default => []

  # If true, only store named captures from grok.
  config :named_captures_only, :validate => :boolean, :default => true

  # If true, keep empty captures as event fields.
  config :keep_empty_captures, :validate => :boolean, :default => false

  # Append values to the 'tags' field when there has been no
  # successful match
  config :tag_on_failure, :validate => :array, :default => ["_grokparsefailure"]

  # The fields to overwrite.
  #
  # This allows you to overwrite a value in a field that already exists.
  config :overwrite, :validate => :array, :default => []

  # Detect if we are running from a jarfile, pick the right path.
  @@patterns_path ||= Set.new
  @@patterns_path += [LogStash::Environment.pattern_path("*")]

  public
  def initialize(params)
    super(params)
    @handlers = {}
  end

  public
  def register
    require "grok-pure" # rubygem 'jls-grok'

    @patternfiles = []

    # Have @@patterns_path show first. Last-in pattern definitions win; this
    # will let folks redefine built-in patterns at runtime.
    @patterns_dir = @@patterns_path.to_a + @patterns_dir
    @logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
    @patterns_dir.each do |path|
      if File.directory?(path)
        path = File.join(path, "*")
      end

      Dir.glob(path).each do |file|
        @logger.info? and @logger.info("Grok loading patterns from file", :path => file)
        @patternfiles << file
      end
    end

    @patterns = Hash.new { |h,k| h[k] = [] }

    @grok = Grok.new
    @patternfiles.each { |path| @grok.add_patterns_from_file(path) }

  end # def register

  public
  def filter(event)
    return unless filter?(event)
    return if event[@match_field].nil? || event[@pattern_field].nil?

    @logger.debug? and @logger.debug("Running grok_dynamic filter", :event => event);
    @grok.compile(event[@pattern_field]);
    if match(@grok,@match_field, event)
      filter_matched(event)
    else
      # Tag this event if we can't parse it. We can use this later to
      # reparse+reindex logs if we improve the patterns given.
      @tag_on_failure.each do |tag|
        event["tags"] ||= []
        event["tags"] << tag unless event["tags"].include?(tag)
      end
    end

    @logger.debug? and @logger.debug("Event now: ", :event => event)
  end # def filter

  private
  def match(grok, field, event)
    input = event[field]
    if input.is_a?(Array)
      success = true
      input.each do |input|
        match = grok.match(input)
        if match
          match.each_capture do |capture, value|
            handle(capture, value, event)
          end
        else
          success = false
        end
      end
      return success
    #elsif input.is_a?(String)
    else
      # Convert anything else to string (number, hash, etc)
      match = grok.match(input.to_s)
      return false if !match

      match.each_capture do |capture, value|
        handle(capture, value, event)
      end
      return true
    end
  rescue StandardError => e
    @logger.warn("Grok regexp threw exception", :exception => e.message)
  end

  private
  def handle(capture, value, event)
    handler = @handlers[capture] ||= compile_capture_handler(capture)
    return handler.call(value, event)
  end

  private
  def compile_capture_handler(capture)
    # SYNTAX:SEMANTIC:TYPE
    syntax, semantic, coerce = capture.split(":")

    # each_capture do |fullname, value|
    #   capture_handlers[fullname].call(value, event)
    # end

    code = []
    code << "# for capture #{capture}"
    code << "lambda do |value, event|"
    #code << "  p :value => value, :event => event"
    if semantic.nil?
      if @named_captures_only
        # Abort early if we are only keeping named (semantic) captures
        # and this capture has no semantic name.
        code << "  return"
      else
        field = syntax
      end
    else
      field = semantic
    end
    code << "  return if value.nil? || value.empty?" unless @keep_empty_captures
    if coerce
      case coerce
        when "int"; code << "  value = value.to_i"
        when "float"; code << "  value = value.to_f"
      end
    end

    code << "  # field: #{field}"
    if @overwrite.include?(field)
      code << "  event[field] = value"
    else
      code << "  v = event[field]"
      code << "  if v.nil?"
      code << "    event[field] = value"
      code << "  elsif v.is_a?(Array)"
      code << "    event[field] << value"
      code << "  elsif v.is_a?(String)"
      # Promote to array since we aren't overwriting.
      code << "    event[field] = [v, value]"
      code << "  end"
    end
    code << "  return"
    code << "end"

    #puts code
    return eval(code.join("\n"), binding, "<grok capture #{capture}>")
  end # def compile_capture_handler

end # class LogStash::Filters::Grok

这篇关于使用字段作为Logstash Grok过滤器模式的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆