通过字段在ELK中关联消息 [英] Correlate messages in ELK by field

查看:133
本文介绍了通过字段在ELK中关联消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

相关:



正如你所看到的,我们试图在不考虑common_id_number的情况下进行图形化,但似乎我们必须使用它。



任何帮助?



编辑



这些是ES模板中的相关字段定义:

 URIHost:{
type:string,
norms:{
enabled:false
},
fields:{
raw :{
type:string,
index:not_analyzed,
ignore_above:256
}
}
},
Type:{
type:string,
norms:{
enabled:false
},
fields:{
raw:{
type:string,
index:not_analyzed,
ignore_above:256

}
},
SessionID:{
type:long
},
Bytes:{
type:long
},
BytesReceived:{
type:long
},
BytesSent:{
type:long
$,

这是一个TRAFFIC类型的编辑文档:

  {
_index:logstash-2015.11.05,
_type:paloalto,
_id:
_sore:$
_source:$$$$$$ 2015-11-05T21:59:55.543Z,
syslog_severity_code:5,
syslog_facility_code:1,
syslog_timestamp:Nov 5 22:59:58 ,
Type:TRAFFIC,
SessionID:21713,
Bytes:939,
BytesSent:480,
BytesReceived :459,
},
fields:{
@timestamp:[
1446760795543
]
},
:[
1446760795543
]
}

这是一个THREAT类型的文件:

  {
_index:logstash-2015.11.05,
_type:paloalto,
_id:AVDZqVNIpQiRid-uxPjC,
_score:null,
_source:{
@version:1,
@timestamp:2015-11-05T21:59:23.440Z,
syslog_severity_code 5,
syslog_facility_code:1,
syslog_timestamp:11月5日22:59:26,
Type:THREAT,
SessionID 21713,
URIHost:whatever.nevermind.com,
URIPath:/connectiontest.html
},
fields:{
@timestamp:[
1446760763440
]
},
sort:[
1446760763440
]
}

这是logstash过滤器配置:

  filter {
if [type] ==paloalto{
syslog_pri {
remove_field => [syslog_facility,syslog_severity]
}

grok {
match => {
message=> %{SYSLOGTIMESTAMP:syslog_timestamp}%{HOSTNAME:hostname}%{INT},%{YEAR} /%{MONTHNUM} /%{MONTHDAY}%{TIME},%{INT},%{WORD:Type},% {GREEDYDATA:log}
}
remove_field => [message]
}

如果[Type] ==THREAT{
csv {
source => log
columns => [Threat_OR_ContentType,ConfigVersion,GenerateTime,SourceAddress,DestinationAddress,NATSourceIP,NATDestinationIP,Rule,SourceUser,DestinationUser SourceZone,DestinationZone,InboundInterface,OutboundInterface,LogAction,TimeLogged,SessionID,RepeatCount,SourcePort,DestinationPort,NATSourcePort,NATDestinationPort ,IPProtocol,Action,URL,Threat_OR_ContentName,reportid,Category,Severity,Direction,seqnoactionflags,SourceCountry cappding,contenttype,pcap_id,filedigest,cloud,url_idx,user_agent,filetype,xff ]
remove_field => [log]
}
mutate {
convert => {
SessionID=> integer
SourcePort=> integer
DestinationPort=> integer
NATSourcePort=> integer
NATDestinationPort=> integer
}
remove_field => [ConfigVersion,GenerateTime,VirtualSystem,InboundInterface,OutboundInterface,LogAction,TimeLogged,RepeatCount,Flags,Action seqno,actionflags,cpadding,pcap_id,filedigest,recipient]
}
grok {
match => {
URL=> %{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?
}
remove_field => [URL]
}
}

else if [Type] ==TRAFFIC{
csv {
source => log
columns => [Threat_OR_ContentType,ConfigVersion,GenerateTime,SourceAddress,DestinationAddress,NATSourceIP,NATDestinationIP,Rule,SourceUser,DestinationUser SourceZone,DestinationZone,InboundInterface,OutboundInterface,LogAction,TimeLogged,SessionID,RepeatCount,SourcePort,DestinationPort,NATSourcePort,NATDestinationPort ,IP协议,动作,字节,字节,字节接收,分组,起始时间,经过时间间隔,分类 SourceCountry,DestinationCountry,cpadding,pkts_sent,pkts_received,session_end_reason]
remove_field => [log]
}
mutate {
convert => {
SessionID=> integer
SourcePort=> integer
DestinationPort=> integer
NATSourcePort=> integer
NATDestinationPort=> integer
Bytes=> integer
BytesSent=> integer
BytesReceived=> integer
ElapsedTimeInSecs=> integer
}
remove_field => [ConfigVersion,GenerateTime,VirtualSystem,InboundInterface,OutboundInterface,LogAction,TimeLogged,RepeatCount,Flags,Action,Packets,StartTime seqno,actionflags,cpadding,pcap_id,filedigest,recipient]
}
}

date {
match => ; [syslog_timastamp,MMM d HH:mm:ss,MMM dd HH:mm:ss]
timezone => CET
remove_field => [syslog_timestamp]
}
}
}

什么我们正在努力将URIHost术语可视化为X轴和Bytes,BytesSent和BytesReceived总和作为Y轴。

解决方案

认为您可以使用 aggregate 过滤器来执行你的任务。 聚合过滤器支持根据公共字段值将多个日志行聚合到一个单个事件中。在你的情况下,我们要使用的通用字段将是 SessionID 字段。



然后,我们需要另一个字段来检测第一个事件与应该聚合的第二个/最后一个事件。在你的情况下,这将是 Type 字段。



您需要更改当前的配置,如下所示:

  {

...所有其他过滤器

如果[类型] ==THREAT{
...所有其他过滤器

聚合{
task_id => %{SessionID}
code => map ['URIHost'] = event ['URIHost']; map ['URIPath'] = event ['URIPath']
}
}

else if [类型] ==TRAFFIC{
...所有其他过滤器

聚合{
task_id => %{SessionID}
code => event ['URIHost'] = map ['URIHost']; event ['URIPath'] = map ['URIPath']
end_of_task => true
timeout => 120
}
}
}

一般的想法是当Logstash遇到 THREAT 日志时,它将临时存储 URIHost URIPath 在内存中的事件映射中,然后当 TRAFFIC 日志进入时, URIHost URIPath 字段将被添加到事件中。如果需要,您也可以复制其他字段。您还可以根据您期望在 TRAFFIC 事件在最后一个 THREAT event。



最后,您将获得从 THREAT TRAFFIC 日志行,您可以轻松创建可视化,显示每个 URIHost 的字节数,如屏幕截图所示。


Related to: Combine logs and query in ELK

We are setting up ELK and would want to create a visualization in Kibana 4. The issue here is that we want to relate between two different types of message.

To simplify:

  • Message type 1 fields: message_type, common_id_number, byte_count, ...
  • Message type 2 fields: message_type, common_id_number, hostname, ...

Both messages share the same index in elasticsearch.

As you can see we were trying to graph without taking that common_id_number into account, but it seems that we must use it. We don't know how yet, though.

Any help?

EDIT

These are the relevant field definitions in the ES template:

      "URIHost" : {
        "type" : "string",
        "norms" : {
          "enabled" : false
        },
        "fields" : {
          "raw" : {
            "type" : "string",
            "index" : "not_analyzed",
            "ignore_above" : 256
          }
        }
      },
      "Type" : {
        "type" : "string",
        "norms" : {
          "enabled" : false
        },
        "fields" : {
          "raw" : {
            "type" : "string",
            "index" : "not_analyzed",
            "ignore_above" : 256
          }
        }
      },
      "SessionID" : {
        "type" : "long"
      },
      "Bytes" : {
        "type" : "long"
      },
      "BytesReceived" : {
        "type" : "long"
      },
      "BytesSent" : {
        "type" : "long"
      },

This is a TRAFFIC type, edited document:

{
  "_index": "logstash-2015.11.05",
  "_type": "paloalto",
  "_id": "AVDZqdBjpQiRid-uxPjE",
  "_score": null,
  "_source": {
    "@version": "1",
    "@timestamp": "2015-11-05T21:59:55.543Z",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_timestamp": "Nov  5 22:59:58",
    "Type": "TRAFFIC",
    "SessionID": 21713,
    "Bytes": 939,
    "BytesSent": 480,
    "BytesReceived": 459,
  },
  "fields": {
    "@timestamp": [
      1446760795543
    ]
  },
  "sort": [
    1446760795543
  ]
}

And this is a THREAT type document:

{
  "_index": "logstash-2015.11.05",
  "_type": "paloalto",
  "_id": "AVDZqVNIpQiRid-uxPjC",
  "_score": null,
  "_source": {
    "@version": "1",
    "@timestamp": "2015-11-05T21:59:23.440Z",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_timestamp": "Nov  5 22:59:26",
    "Type": "THREAT",
    "SessionID": 21713,
    "URIHost": "whatever.nevermind.com",
    "URIPath": "/connectiontest.html"
  },
  "fields": {
    "@timestamp": [
      1446760763440
    ]
  },
  "sort": [
    1446760763440
  ]
}

This is the logstash "filter" configuration:

filter {
    if [type] == "paloalto" {
        syslog_pri {
            remove_field => [ "syslog_facility", "syslog_severity" ]
        }

        grok {
            match => {
                "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}"
            }
            remove_field => [ "message" ]
        }

        if [Type] == "THREAT" {
            csv {
                source => "log"
                columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ]
                remove_field => [ "log" ]
            }
            mutate {
                convert => {
                    "SessionID" => "integer"
                    "SourcePort" => "integer"
                    "DestinationPort" => "integer"
                    "NATSourcePort" => "integer"
                    "NATDestinationPort" => "integer"
                }
                remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
            }
            grok {
                match => {
                    "URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?"
                }
                remove_field => [ "URL" ]
            }
        }

        else if [Type] == "TRAFFIC" {
            csv {
                source => "log"
                columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ]
                remove_field => [ "log" ]
            }
            mutate {
                convert => {
                    "SessionID" => "integer"
                    "SourcePort" => "integer"
                    "DestinationPort" => "integer"
                    "NATSourcePort" => "integer"
                    "NATDestinationPort" => "integer"
                    "Bytes" => "integer"
                    "BytesSent" => "integer"
                    "BytesReceived" => "integer"
                    "ElapsedTimeInSecs" => "integer"
                }
                remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
            }
        }

        date {
            match => [ "syslog_timastamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
            timezone => "CET"
            remove_field => [ "syslog_timestamp" ]
        }
    }
}

What we are trying to do is to visualize URIHost terms as X axis and Bytes, BytesSent and BytesReceived sums as Y axis.

解决方案

I think you can use the aggregate filter to carry out your task. The aggregate filter provides support for aggregating several log lines into one single event based on a common field value. In your case, the common field we're going to use will be the SessionID field.

Then we need another field to detect the first event vs the second/last event that should be aggregated. In your case, this would be the Type field.

You need to change your current configuration like this:

filter {

    ... all other filters

    if [Type] == "THREAT" {
        ... all other filters

        aggregate {
            task_id => "%{SessionID}"
            code => "map['URIHost'] = event['URIHost']; map['URIPath'] = event['URIPath']"
        }
    }

    else if [Type] == "TRAFFIC" {
        ... all other filters

        aggregate {
            task_id => "%{SessionID}"
            code => "event['URIHost'] = map['URIHost']; event['URIPath'] = map['URIPath']"
            end_of_task => true
            timeout => 120
        }
    }
}

The general idea is that when Logstash encounters THREAT logs it will temporarily store the URIHost and URIPath in the in-memory event map, and then when a TRAFFIC log comes in, the URIHost and URIPath fields will be added to the event. You can copy other fields, too, if needed. You can also adapt the timeout (in seconds) depending on how long you expect a TRAFFIC event to come in after the last THREAT event.

In the end, you'll get documents with data merged from both THREAT and TRAFFIC log lines and you can easily create the visualization showing bytes count per URIHost as shown on your screenshot.

这篇关于通过字段在ELK中关联消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆