附加json logstash elasticsearch数组 [英] append array of json logstash elasticsearch
问题描述
我如何使用csv中的logstash在带有json对象的elasticsearch上附加数组
how can i append an array on elasticsearch with json object using logstash from csv
csv的示例
csv包含行
id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2
结果应为2个文档
{
"id": 1,
[{
"key1": "toto1",
"key2": "toto2"
}, {
"key1": "titi1 ",
"key2": "titi2"
}]
}
,{
"id": 2,
[{
"key1": "tata1",
"key2": "tata2"
}]
}
皮质
推荐答案
首先,创建ES映射(如果需要),将内部对象声明为嵌套对象.
First, create your ES mapping, if necessarry, declaring you inner objects as nested objects.
{
"mappings": {
"key_container": {
"properties": {
"id": {
"type": "keyword",
"index": true
},
"keys": {
"type": "nested",
"properties": {
"key1": {
"type": "keyword",
"index": true
},
"key2": {
"type": "text",
"index": true
}
}
}
}
}
}
}
keys属性将包含嵌套对象的数组.
The keys property will contain the array of nested objects.
比起使用logstash,您可以分两步加载csv:
Than you can load the csv in two hops with logstash:
- 索引(创建)仅包含id属性的基础对象
- 使用包含嵌套对象数组的keys属性更新基础对象
第一个logstash配置(仅相关部分):
The first logstash configuration (only the relevant part):
filter {
csv {
columns => ["id","key1","key1"]
separator => ","
# Remove the keys because the will be loaded in the next hop with update
remove_field => [ "key1", "key2"]
}
# Remove the row containing the column names
if [id] == "id" {
drop { }
}
}
output {
elasticsearch {
action => "index"
document_id => "%{id}"
hosts => [ "localhost:9200" ]
index => "key_container"
}
}
第二步进行logstash配置(必须在elasticsearch中启用脚本):
The second steps logstash configuration (you have to enable scripting in elasticsearch):
filter {
csv {
columns => ["id","key1","key2"]
separator => ","
}
# Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
mutate{
rename => {
"key1" => "[key][key1]"
"key2" => "[key][key2]"
}
}
}
output {
elasticsearch {
action => "update"
document_id => "%{id}"
doc_as_upsert => "true"
hosts => [ "localhost:9200" ]
index => "key_container"
script_lang => "groovy"
# key_container.keys is an array of key objects
# arrays can be built only with scripts and defined as an array when we put the first element into it
script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
}
}
总之,由于数组创建需要脚本(只有更新才可用),因此需要两次加载.
Summary, you need this two hop loading because of array creation that requires scripting which is available only with update.
这篇关于附加json logstash elasticsearch数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!