使用拦截器在Flume中过滤日志文件 [英] Filtering log files in Flume using interceptors
问题描述
我有一个http服务器编写日志文件,然后使用Flume
将其加载到HDFS首先,我想根据我的标题或正文中的数据过滤数据。我读过,我可以使用正则表达式拦截器来做到这一点,有人可以解释我需要做什么吗?我需要编写覆盖Flume代码的Java代码吗?
另外,我想获取数据并根据标题将其发送到不同的汇(即source = 1转到sink1,source = 2转到sink2)这是怎么完成的?
谢谢,
Shimon
您不需要编写Java代码来过滤事件。使用正则表达式过滤拦截器来过滤正文与某些正则表达式匹配的事件:
agent.sources.logs_source.interceptors = regex_filter_interceptor
agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
agent .sources.logs_source.interceptors.regex_filter_interceptor.regex =<你的正则表达式>
agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true
在标头上使用多路复用频道选择器:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources .r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources .r1.selector.default = c4
这里头标为state=CZ的事件转到频道c1,其中状态=美国 - c2和c3,所有其他 - 转换为c4。 还通过标题过滤事件 - 只是将特定标题值路由到频道,该频道指向空洞。 p>
I have an http server writing log files which I then load into HDFS using Flume First I want to filter data according to data I have in my header or body. I read that I can do this using an interceptor with regex, can someone explain exactly what I need to do? Do I need to write Java code that overrides the Flume code?
Also I would like to take data and according to the header send it to a different sink (i.e source=1 goes to sink1 and source=2 goes to sink2) how is this done?
thank you,
Shimon
You don't need to write Java code to filter events. Use Regex Filtering Interceptor to filter events which body text matches some regular expression:
agent.sources.logs_source.interceptors = regex_filter_interceptor
agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true
To route events based on headers use Multiplexing Channel Selector:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Here events with header "state"="CZ" go to channel "c1", with "state"="US" - to "c2" and "c3", all other - to "c4".
This way you can also filter events by header - just route specific header value to channel, which points to Null Sink.
这篇关于使用拦截器在Flume中过滤日志文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!