使用字段作为Logstash Grok过滤器模式的输入 [英] Using field as input to Logstash Grok filter pattern
问题描述
我想知道是否可以使用Logstash消息中的字段作为Grok模式的输入.假设我有一个看起来像这样的条目:
I'm wondering if it is possible to use a field in the Logstash message as the input the to Grok pattern. Say I have an entry that looks like:
{
"message":"10.1.1.1",
"grok_filter":"%{IP:client}"
}
我希望能够做这样的事情:
I want to be able to do something like this:
filter {
grok {
match => ["message", ["%{grok_filter}"]]
}
}
问题是这会使Logstash崩溃,因为它似乎将%{grok_filter}"视为Grok过滤器本身而不是grok_filter的值. Logstash崩溃后,我得到以下信息:
The problem is this crashes Logstash as it appears to treat "%{grok_filter}" as the Grok filter itself instead of the value of grok_filter. I get the following after Logstash has crashed:
The error reported is:
pattern %{grok_filter} not defined
反正有没有从Grok过滤器块内部获取字段的值并将其用作Grok模式的输入?
Is there anyway to get the value of a field from inside the Grok filter block and use that as the input to the Grok pattern?
推荐答案
答案是否定的-初始化过滤器时,grok
过滤器将编译其模式.如果您需要执行类似的操作,则必须编写自己的过滤器,该过滤器每次都会编译该模式(并付出性能损失).
The answer is no -- the grok
filter compiles its pattern when the filter is initialized. If you need to do something like that you'll have to write your own filter that compiles the pattern every time (and pay the performance penalty).
在不了解您为什么要执行此操作的情况下,很难推荐最佳的操作方法.如果模式数量有限,则可以只设置一个grok_filter_type
参数,然后使用一堆if [grok_filter_type] == 'ip' { grok { ... } }
类型的东西.
Without knowing more about why you want to do this, it's hard to recommend the best course of action. If you have a limited number of patterns, you can just set a grok_filter_type
parameter and then have a bunch of if [grok_filter_type] == 'ip' { grok { ... } }
type of things.
这是一个自定义过滤器,可让您执行所需的操作-主要是grok代码的副本,但有一些更改/简化.我已经对其进行了测试,它似乎很适合我.
Here's a custom filter that will allow you to do what you want -- it's mostly a copy of the grok code, but there are some changes/simplifications. I've tested it and it seems to work for me.
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/environment"
require "set"
# A version of grok that can parse from a log-defined pattern. Not really
# recommended for high usage patterns, but for the occassional pattern it
# should work
# filter {
# grok_dynamic {
# match_field => "message"
# pattern_field => "message_pattern"
# }
# }
#
class LogStash::Filters::GrokDynamic < LogStash::Filters::Base
config_name "grok_dynamic"
milestone 1
# The field that contains the data to match against
config :match_field, :validate => :string, :required => true
# the field that contains the pattern
config :pattern_field, :validate => :string, :required => true
# where the patterns are
config :patterns_dir, :validate => :array, :default => []
# If true, only store named captures from grok.
config :named_captures_only, :validate => :boolean, :default => true
# If true, keep empty captures as event fields.
config :keep_empty_captures, :validate => :boolean, :default => false
# Append values to the 'tags' field when there has been no
# successful match
config :tag_on_failure, :validate => :array, :default => ["_grokparsefailure"]
# The fields to overwrite.
#
# This allows you to overwrite a value in a field that already exists.
config :overwrite, :validate => :array, :default => []
# Detect if we are running from a jarfile, pick the right path.
@@patterns_path ||= Set.new
@@patterns_path += [LogStash::Environment.pattern_path("*")]
public
def initialize(params)
super(params)
@handlers = {}
end
public
def register
require "grok-pure" # rubygem 'jls-grok'
@patternfiles = []
# Have @@patterns_path show first. Last-in pattern definitions win; this
# will let folks redefine built-in patterns at runtime.
@patterns_dir = @@patterns_path.to_a + @patterns_dir
@logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
@patterns_dir.each do |path|
if File.directory?(path)
path = File.join(path, "*")
end
Dir.glob(path).each do |file|
@logger.info? and @logger.info("Grok loading patterns from file", :path => file)
@patternfiles << file
end
end
@patterns = Hash.new { |h,k| h[k] = [] }
@grok = Grok.new
@patternfiles.each { |path| @grok.add_patterns_from_file(path) }
end # def register
public
def filter(event)
return unless filter?(event)
return if event[@match_field].nil? || event[@pattern_field].nil?
@logger.debug? and @logger.debug("Running grok_dynamic filter", :event => event);
@grok.compile(event[@pattern_field]);
if match(@grok,@match_field, event)
filter_matched(event)
else
# Tag this event if we can't parse it. We can use this later to
# reparse+reindex logs if we improve the patterns given.
@tag_on_failure.each do |tag|
event["tags"] ||= []
event["tags"] << tag unless event["tags"].include?(tag)
end
end
@logger.debug? and @logger.debug("Event now: ", :event => event)
end # def filter
private
def match(grok, field, event)
input = event[field]
if input.is_a?(Array)
success = true
input.each do |input|
match = grok.match(input)
if match
match.each_capture do |capture, value|
handle(capture, value, event)
end
else
success = false
end
end
return success
#elsif input.is_a?(String)
else
# Convert anything else to string (number, hash, etc)
match = grok.match(input.to_s)
return false if !match
match.each_capture do |capture, value|
handle(capture, value, event)
end
return true
end
rescue StandardError => e
@logger.warn("Grok regexp threw exception", :exception => e.message)
end
private
def handle(capture, value, event)
handler = @handlers[capture] ||= compile_capture_handler(capture)
return handler.call(value, event)
end
private
def compile_capture_handler(capture)
# SYNTAX:SEMANTIC:TYPE
syntax, semantic, coerce = capture.split(":")
# each_capture do |fullname, value|
# capture_handlers[fullname].call(value, event)
# end
code = []
code << "# for capture #{capture}"
code << "lambda do |value, event|"
#code << " p :value => value, :event => event"
if semantic.nil?
if @named_captures_only
# Abort early if we are only keeping named (semantic) captures
# and this capture has no semantic name.
code << " return"
else
field = syntax
end
else
field = semantic
end
code << " return if value.nil? || value.empty?" unless @keep_empty_captures
if coerce
case coerce
when "int"; code << " value = value.to_i"
when "float"; code << " value = value.to_f"
end
end
code << " # field: #{field}"
if @overwrite.include?(field)
code << " event[field] = value"
else
code << " v = event[field]"
code << " if v.nil?"
code << " event[field] = value"
code << " elsif v.is_a?(Array)"
code << " event[field] << value"
code << " elsif v.is_a?(String)"
# Promote to array since we aren't overwriting.
code << " event[field] = [v, value]"
code << " end"
end
code << " return"
code << "end"
#puts code
return eval(code.join("\n"), binding, "<grok capture #{capture}>")
end # def compile_capture_handler
end # class LogStash::Filters::Grok
这篇关于使用字段作为Logstash Grok过滤器模式的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!