通过字段在ELK中关联消息 [英] Correlate messages in ELK by field
问题描述
相关:
正如你所看到的,我们试图在不考虑common_id_number的情况下进行图形化,但似乎我们必须使用它。
任何帮助?
编辑
这些是ES模板中的相关字段定义:
URIHost:{
type:string,
norms:{
enabled:false
},
fields:{
raw :{
type:string,
index:not_analyzed,
ignore_above:256
}
}
},
Type:{
type:string,
norms:{
enabled:false
},
fields:{
raw:{
type:string,
index:not_analyzed,
ignore_above:256
}
},
SessionID:{
type:long
},
Bytes:{
type:long
},
BytesReceived:{
type:long
},
BytesSent:{
type:long
$,
这是一个TRAFFIC类型的编辑文档:
{
_index:logstash-2015.11.05,
_type:paloalto,
_id:
_sore:$
_source:$$$$$$ 2015-11-05T21:59:55.543Z,
syslog_severity_code:5,
syslog_facility_code:1,
syslog_timestamp:Nov 5 22:59:58 ,
Type:TRAFFIC,
SessionID:21713,
Bytes:939,
BytesSent:480,
BytesReceived :459,
},
fields:{
@timestamp:[
1446760795543
]
},
:[
1446760795543
]
}
这是一个THREAT类型的文件:
{
_index:logstash-2015.11.05,
_type:paloalto,
_id:AVDZqVNIpQiRid-uxPjC,
_score:null,
_source:{
@version:1,
@timestamp:2015-11-05T21:59:23.440Z,
syslog_severity_code 5,
syslog_facility_code:1,
syslog_timestamp:11月5日22:59:26,
Type:THREAT,
SessionID 21713,
URIHost:whatever.nevermind.com,
URIPath:/connectiontest.html
},
fields:{
@timestamp:[
1446760763440
]
},
sort:[
1446760763440
]
}
这是logstash过滤器配置:
filter {
if [type] ==paloalto{
syslog_pri {
remove_field => [syslog_facility,syslog_severity]
}
grok {
match => {
message=> %{SYSLOGTIMESTAMP:syslog_timestamp}%{HOSTNAME:hostname}%{INT},%{YEAR} /%{MONTHNUM} /%{MONTHDAY}%{TIME},%{INT},%{WORD:Type},% {GREEDYDATA:log}
}
remove_field => [message]
}
如果[Type] ==THREAT{
csv {
source => log
columns => [Threat_OR_ContentType,ConfigVersion,GenerateTime,SourceAddress,DestinationAddress,NATSourceIP,NATDestinationIP,Rule,SourceUser,DestinationUser SourceZone,DestinationZone,InboundInterface,OutboundInterface,LogAction,TimeLogged,SessionID,RepeatCount,SourcePort,DestinationPort,NATSourcePort,NATDestinationPort ,IPProtocol,Action,URL,Threat_OR_ContentName,reportid,Category,Severity,Direction,seqnoactionflags,SourceCountry cappding,contenttype,pcap_id,filedigest,cloud,url_idx,user_agent,filetype,xff ]
remove_field => [log]
}
mutate {
convert => {
SessionID=> integer
SourcePort=> integer
DestinationPort=> integer
NATSourcePort=> integer
NATDestinationPort=> integer
}
remove_field => [ConfigVersion,GenerateTime,VirtualSystem,InboundInterface,OutboundInterface,LogAction,TimeLogged,RepeatCount,Flags,Action seqno,actionflags,cpadding,pcap_id,filedigest,recipient]
}
grok {
match => {
URL=> %{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?
}
remove_field => [URL]
}
}
else if [Type] ==TRAFFIC{
csv {
source => log
columns => [Threat_OR_ContentType,ConfigVersion,GenerateTime,SourceAddress,DestinationAddress,NATSourceIP,NATDestinationIP,Rule,SourceUser,DestinationUser SourceZone,DestinationZone,InboundInterface,OutboundInterface,LogAction,TimeLogged,SessionID,RepeatCount,SourcePort,DestinationPort,NATSourcePort,NATDestinationPort ,IP协议,动作,字节,字节,字节接收,分组,起始时间,经过时间间隔,分类 SourceCountry,DestinationCountry,cpadding,pkts_sent,pkts_received,session_end_reason]
remove_field => [log]
}
mutate {
convert => {
SessionID=> integer
SourcePort=> integer
DestinationPort=> integer
NATSourcePort=> integer
NATDestinationPort=> integer
Bytes=> integer
BytesSent=> integer
BytesReceived=> integer
ElapsedTimeInSecs=> integer
}
remove_field => [ConfigVersion,GenerateTime,VirtualSystem,InboundInterface,OutboundInterface,LogAction,TimeLogged,RepeatCount,Flags,Action,Packets,StartTime seqno,actionflags,cpadding,pcap_id,filedigest,recipient]
}
}
date {
match => ; [syslog_timastamp,MMM d HH:mm:ss,MMM dd HH:mm:ss]
timezone => CET
remove_field => [syslog_timestamp]
}
}
}
什么我们正在努力将URIHost术语可视化为X轴和Bytes,BytesSent和BytesReceived总和作为Y轴。
认为您可以使用 aggregate
过滤器来执行你的任务。 聚合
过滤器支持根据公共字段值将多个日志行聚合到一个单个事件中。在你的情况下,我们要使用的通用字段将是 SessionID
字段。
然后,我们需要另一个字段来检测第一个事件与应该聚合的第二个/最后一个事件。在你的情况下,这将是 Type
字段。
您需要更改当前的配置,如下所示:
{
...所有其他过滤器
如果[类型] ==THREAT{
...所有其他过滤器
聚合{
task_id => %{SessionID}
code => map ['URIHost'] = event ['URIHost']; map ['URIPath'] = event ['URIPath']
}
}
else if [类型] ==TRAFFIC{
...所有其他过滤器
聚合{
task_id => %{SessionID}
code => event ['URIHost'] = map ['URIHost']; event ['URIPath'] = map ['URIPath']
end_of_task => true
timeout => 120
}
}
}
一般的想法是当Logstash遇到 THREAT
日志时,它将临时存储 URIHost
和 URIPath
在内存中的事件映射中,然后当 TRAFFIC
日志进入时, URIHost
和 URIPath
字段将被添加到事件中。如果需要,您也可以复制其他字段。您还可以根据您期望在 TRAFFIC
事件在最后一个 THREAT $ c $之后进入的时间,调整超时时间(以秒为单位) c> event。
最后,您将获得从 THREAT
和 TRAFFIC
日志行,您可以轻松创建可视化,显示每个 URIHost
的字节数,如屏幕截图所示。
Related to: Combine logs and query in ELK
We are setting up ELK and would want to create a visualization in Kibana 4. The issue here is that we want to relate between two different types of message.
To simplify:
- Message type 1 fields: message_type, common_id_number, byte_count, ...
- Message type 2 fields: message_type, common_id_number, hostname, ...
Both messages share the same index in elasticsearch.
As you can see we were trying to graph without taking that common_id_number into account, but it seems that we must use it. We don't know how yet, though.
Any help?
EDIT
These are the relevant field definitions in the ES template:
"URIHost" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"Type" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"SessionID" : {
"type" : "long"
},
"Bytes" : {
"type" : "long"
},
"BytesReceived" : {
"type" : "long"
},
"BytesSent" : {
"type" : "long"
},
This is a TRAFFIC type, edited document:
{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqdBjpQiRid-uxPjE",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:55.543Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:58",
"Type": "TRAFFIC",
"SessionID": 21713,
"Bytes": 939,
"BytesSent": 480,
"BytesReceived": 459,
},
"fields": {
"@timestamp": [
1446760795543
]
},
"sort": [
1446760795543
]
}
And this is a THREAT type document:
{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqVNIpQiRid-uxPjC",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:23.440Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:26",
"Type": "THREAT",
"SessionID": 21713,
"URIHost": "whatever.nevermind.com",
"URIPath": "/connectiontest.html"
},
"fields": {
"@timestamp": [
1446760763440
]
},
"sort": [
1446760763440
]
}
This is the logstash "filter" configuration:
filter {
if [type] == "paloalto" {
syslog_pri {
remove_field => [ "syslog_facility", "syslog_severity" ]
}
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}"
}
remove_field => [ "message" ]
}
if [Type] == "THREAT" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
grok {
match => {
"URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?"
}
remove_field => [ "URL" ]
}
}
else if [Type] == "TRAFFIC" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
"Bytes" => "integer"
"BytesSent" => "integer"
"BytesReceived" => "integer"
"ElapsedTimeInSecs" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
}
date {
match => [ "syslog_timastamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
timezone => "CET"
remove_field => [ "syslog_timestamp" ]
}
}
}
What we are trying to do is to visualize URIHost terms as X axis and Bytes, BytesSent and BytesReceived sums as Y axis.
I think you can use the aggregate
filter to carry out your task. The aggregate
filter provides support for aggregating several log lines into one single event based on a common field value. In your case, the common field we're going to use will be the SessionID
field.
Then we need another field to detect the first event vs the second/last event that should be aggregated. In your case, this would be the Type
field.
You need to change your current configuration like this:
filter {
... all other filters
if [Type] == "THREAT" {
... all other filters
aggregate {
task_id => "%{SessionID}"
code => "map['URIHost'] = event['URIHost']; map['URIPath'] = event['URIPath']"
}
}
else if [Type] == "TRAFFIC" {
... all other filters
aggregate {
task_id => "%{SessionID}"
code => "event['URIHost'] = map['URIHost']; event['URIPath'] = map['URIPath']"
end_of_task => true
timeout => 120
}
}
}
The general idea is that when Logstash encounters THREAT
logs it will temporarily store the URIHost
and URIPath
in the in-memory event map, and then when a TRAFFIC
log comes in, the URIHost
and URIPath
fields will be added to the event. You can copy other fields, too, if needed. You can also adapt the timeout (in seconds) depending on how long you expect a TRAFFIC
event to come in after the last THREAT
event.
In the end, you'll get documents with data merged from both THREAT
and TRAFFIC
log lines and you can easily create the visualization showing bytes count per URIHost
as shown on your screenshot.
这篇关于通过字段在ELK中关联消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!