弹性搜索通过儿童聚合流水线 [英] Elasticsearch Pipelining through a Child Aggregation

查看:117
本文介绍了弹性搜索通过儿童聚合流水线的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



使用流水线我正在尝试将父级汇总数据归纳在父级上的总结:

  {
查询:{
match_all:{}
$ baggs:{
unit:{
terms:{
size:500,
field: unit_id
},
aggs:{
total_active_ministers_by_unit:{
sum_bucket:{
buckets_path:ministers> active_minister_by_ministry.value
}
},
部长:{
children:{
type:member_ministry
},
aggs:{
active_minister_by_ministry:{
sum_bucket:{
buckets_path:部门> active_minister._count
}
},
m in{
terms:{
field:member_version,
size:1,
order:{
_term :desc
}
},
aggs:{
active_minister:{
filter:{
range :{
ministry_date_start:{
lte:now / d,
gte:now-96M / M
}
}
}
},
ministry_type:{
条款:{
字段:部门
}
}
}
}
}
}
}
}
}
}

点击查询后,我得到以下答案e Rest API:

  {
error:{
root_cause:[

],
type:reduce_search_phase_exception,
reason:[reduce],
phase:fetch,
分组:true,
failed_shards:[

],
causes_by:{
type:illegal_argument_exception,
原因:在[active_minister]中找不到名为[value]的汇总
}
},
状态:503
}

Elasticsearch服务器控制台给了我以下内容:

  [2015-12-03 12:19:57,974] [INFO] [rest.suppressed] / unit_member / unit / _search参数:{index = unit_member,type = unit} 
无法执行阶段[ fetch],[reduce]
在org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction $ AsyncAction $ 2.onFailure(TransportSearchQueryThenFetchAction.java:162)
在org.elasticsearch.common.util.concurrent.AbstractRun nable.run(AbstractRunnable.java:42)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor。 java:615)
在java.lang.Thread.run(Thread.java:745)
导致:java.lang.IllegalArgumentException:在[active_minister] $ b中找不到名为[value]的聚合$ b在org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation.getProperty(InternalSingleBucketAggregation.java:111)
在org.elasticsearch.search.aggregations.InternalMultiBucketAggregation $ InternalBucket.getProperty(InternalMultiBucketAggregation.java:98)
在org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue(BucketHelpers.java:160)
在org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator.doReduce(BucketMetricsPipelineAggregator.java:74)
在org.elasticsearch.search.aggregations.pipe line.SiblingPipelineAggregator.reduce(SiblingPipelineAggregator.java:68)
在org.elasticsearch.search.aggregations.InternalAggregation.reduce(InternalAggregation.java:155)
在org.elasticsearch.search.aggregations.InternalAggregations。 reduce(InternalAggregations.java:170)
在org.elasticsearch.search.aggregations.bucket.terms.InternalTerms $ Bucket.reduce(InternalTerms.java:110)
在org.elasticsearch.search.aggregations。 bucket.terms.InternalTerms.doReduce(InternalTerms.java:220)
在org.elasticsearch.search.aggregations.InternalAggregation.reduce(InternalAggregation.java:153)
在org.elasticsearch.search.aggregations。 InternalAggregations.reduce(InternalAggregations.java:170)
在org.elasticsearch.search.controller.SearchPhaseController.merge(SearchPhaseController.java:409)
在org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction $ AsyncAction $ 2.doRun(TransportSearchQueryThenFetchAction.java:149)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
... 3更多

这是一个应该报告的错误?或者我在处理流水线错误的方式?



这是我的映射:

 member_ministry:{
dynamic:false,
_routing:{
required:true
},
属性:{
date_ended:{
ignore_malformed:true,
format:yyyy-MM-dd,
type
},
birthday:{
ignore_malformed:true,
format:yyyy-MM-dd,
type日期
},
parent_path:{
index:not_analyzed,
type:string
},
forename:{
type:string
},
部门:{
index:not_analyzed,
store :
type:string
},
lastname:{
type:string
},
unit_id:{
index:not_analyzed,
sto re:true,
type:string
},
ministry_date_start:{
ignore_malformed:true,
format yyyy-MM-dd,
type:date
},
date_created:{
ignore_malformed:true,
format :yyyy-MM-dd,
type:date
},
ministry_date_end:{
ignore_malformed:true,
格式:yyyy-MM-dd,
type:date
},
sealed_date:{
ignore_malformed:true,
格式:yyyy-MM-dd,
type:date
},
ministry_unit_ids:{
index:not_analyzed ,
type:string
},
member_id:{
index:not_analyzed,
store:true,
type:string
},
version_state:{
index:not_analyzed,
type:str
},
member_ministry_id:{
index:not_analyzed,
type:string
},
gender:{
index:not_analyzed,
store:true,
type:string
},
member_version :{
store:true,
type:integer
},
client_id:{
index:not_analyzed ,
store:true,
type:string
},
primary_ministry_id:{
index:not_analyzed,
type:string
}
},
_parent:{
type:unit
}
}
unit:{
dynamic:false,
properties:{

apostle_district_id:{
index:not_analyzed,
store:true,
type:string
},
date_ended:{
ignore_malformed:true,
format:yyyy-MM-dd,
type:date
}
unit_type_id:{
index:not_analyzed,
store:true,
type:string
},
parent_path:{
index:not_analyzed,
store:true,
type:string
},
sub_district_id:{
index:not_analyzed,
store:true,
type:string
},
:{
index:not_analyzed,
store:true,
type:string
},
unit_id {
index:not_analyzed,
store:true,
type:string
},
date_created:{
ignore_malformed:true,
format:yyyy-MM-dd,
type:date
},
bishop_district_id $ bindex:not_analyzed,
store:true,
type:string
},
administration_id:{
index:not_analyzed,
store:true,
type:string
},
date_opened:{
ignore_malformed :true,
format:yyyy-MM-dd,
type:date
},
district_church_id:{
index:not_analyzed,
store:true,
type:string
},
district_apostle_helper_area_id:{
index :not_analyzed,
store:true,
type:string
},
international_church_id {
index:not_analyzed,
store:true,
type:string
},
unit_type b $ bindex:not_analyzed,
store:true,
type:string
},
district_apostle_area_id:{
index:not_analyzed,
store:true,
type:string
},
unit_version:{
存储:true,
type:integer
},
client_id:{
index:not_analyzed,
store :
type:string
},
regional_administration_center_id:{
index:not_analyzed,
store:true ,
type:string
}
}
},
unit_id:{
index:not_analyzed
store:true,
type:string
}
}

这是索引中最小的数据集



单位



http://pastie.org/private/wippg2zfdrsiul5iaqiga



MemberMinistry:



http:// pastie。 org / private / 4xrnyeygnac5abcnwskg

解决方案

我猜这是一个错误。
$ b

当我把这部分出来时:

 total_active_ministers_by_unit:{
sum_bucket:{
buckets_path:Ministers> active_minister_by_ministry.value
}
},

我得到这些结果:

  {
taken:5,
timed_out:false,
_shar ds:{
total:1,
successful:1,
failed:0
},
hits:{
total:8,
max_score:0,
hits:[]
},
聚合:{
unit :$ {
$ doc $$$$$ $ bdoc_count:3,
部长:{
doc_count:2,
部门:{
doc_count_error_upper_bound:0,
sum_other_doc_count:1,
buckets:[
{
key:2,
doc_count:1,
active_minister:{
doc_count:1
},
ministry_type:{
doc_count_error_upper_bound:0,
sum_other_doc_count:0,
buckets:[
{
key ,
doc_count:1
}
]
}
}
]
},
active_minister_by_ministry {
value:1
}
}
},
{
key:3a7d74e834bbc0d6,
doc_count :2,
Ministers:{
doc_count:1,
部门:{
doc_count_error_upper_bound: 0,
sum_other_doc_count:0,
buckets:[
{
key:2,
doc_count:1,
active_minister:{
doc_count:1
},
ministry_type:{
doc_count_error_upper_bound:0,
sum_other_doc_count:0,
buckets:[
{
key:Apostle,
doc_count:1
}
]
}
}
]
},
active_minister_by_ministry:{
value:1
}
}
},
{
key:3a7b3abb09bfec65,
doc_count:1,
ministers:{
doc_count:0,
部门:{
doc_count_error_upper_bound:0,
sum_other_doc_count:0,
buckets:[]
},
active_minister_by_ministry:{
value:0
}
}
},
{
key:3a7d707e45c8dc24,
doc_count:1,
Ministers:{
doc_count:0,
{
doc_count_error_upper_bound:0,
sum_other_doc_count:0,
buck ets:[]
},
active_minister_by_ministry:{
value:0
}
}
},
{
key:3a7d70975c84428c,
doc_count:1,
Ministers:{
doc_count:0,

doc_count_error_upper_bound:0,
sum_other_doc_count:0,
buckets:[]
},
active_minister_by_ministry:{
价值:0
}
}
}
]
}
}
}

但是尝试跨越父/子关系进行管理会导致错误。



这可能应该被报告为一个错误,除非我错过了一些。



这是我用来测试的代码:



http://sense.qbox.io/gist/075732dfe51916538ff13ea76a2cec1f7532c691


I am trying to Sum up Data through a Child Aggregation in Elasticsearch 2.1.

With Pipelining i am trying to get the Child Aggregation Data summed up on the Parent Level of the Aggregation:

{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "unit": {
      "terms": {
        "size": 500,
        "field": "unit_id"
      },
      "aggs": {
        "total_active_ministers_by_unit": {
          "sum_bucket": {
            "buckets_path": "ministers>active_minister_by_ministry.value"
          }
        },
        "ministers": {
          "children": {
            "type": "member_ministry"
          },
          "aggs": {
            "active_minister_by_ministry": {
              "sum_bucket": {
                "buckets_path": "ministry>active_minister._count"
              }
            },
            "ministry": {
              "terms": {
                "field": "member_version",
                "size": 1,
                "order": {
                  "_term": "desc"
                }
              },
              "aggs": {
                "active_minister": {
                  "filter": {
                    "range": {
                      "ministry_date_start": {
                        "lte": "now/d",
                        "gte": "now-96M/M"
                      }
                    }
                  }
                },
                "ministry_type": {
                  "terms": {
                    "field": "ministry"
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

After firing the Query i get the Following Answer from the Rest API:

{
  "error": {
    "root_cause": [

    ],
    "type": "reduce_search_phase_exception",
    "reason": "[reduce] ",
    "phase": "fetch",
    "grouped": true,
    "failed_shards": [

    ],
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "Cannot find an aggregation named [value] in [active_minister]"
    }
  },
  "status": 503
}

The Elasticsearch Server Console gives me the Following:

[2015-12-03 12:19:57,974][INFO ][rest.suppressed          ] /unit_member/unit/_search Params: {index=unit_member, type=unit}
Failed to execute phase [fetch], [reduce] 
    at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction$2.onFailure(TransportSearchQueryThenFetchAction.java:162)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Cannot find an aggregation named [value] in [active_minister]
    at org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation.getProperty(InternalSingleBucketAggregation.java:111)
    at org.elasticsearch.search.aggregations.InternalMultiBucketAggregation$InternalBucket.getProperty(InternalMultiBucketAggregation.java:98)
    at org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue(BucketHelpers.java:160)
    at org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator.doReduce(BucketMetricsPipelineAggregator.java:74)
    at org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator.reduce(SiblingPipelineAggregator.java:68)
    at org.elasticsearch.search.aggregations.InternalAggregation.reduce(InternalAggregation.java:155)
    at org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:170)
    at org.elasticsearch.search.aggregations.bucket.terms.InternalTerms$Bucket.reduce(InternalTerms.java:110)
    at org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.doReduce(InternalTerms.java:220)
    at org.elasticsearch.search.aggregations.InternalAggregation.reduce(InternalAggregation.java:153)
    at org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:170)
    at org.elasticsearch.search.controller.SearchPhaseController.merge(SearchPhaseController.java:409)
    at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction$2.doRun(TransportSearchQueryThenFetchAction.java:149)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    ... 3 more

Is this a bug which should be reported? Or am i handling the Pipelining the wrong way?

Here is my mapping:

"member_ministry": {
    "dynamic": "false",
    "_routing": {
      "required": true
    },
    "properties": {
      "date_ended": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "birthday": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "parent_path": {
        "index": "not_analyzed",
        "type": "string"
      },
      "forename": {
        "type": "string"
      },
      "ministry": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "lastname": {
        "type": "string"
      },
      "unit_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "ministry_date_start": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "date_created": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "ministry_date_end": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "sealing_date": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "ministry_unit_ids": {
        "index": "not_analyzed",
        "type": "string"
      },
      "member_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "version_state": {
        "index": "not_analyzed",
        "type": "string"
      },
      "member_ministry_id": {
        "index": "not_analyzed",
        "type": "string"
      },
      "gender": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "member_version": {
        "store": true,
        "type": "integer"
      },
      "client_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "primary_ministry_id": {
        "index": "not_analyzed",
        "type": "string"
      }
    },
    "_parent": {
      "type": "unit"
    }
  },
  "unit": {
    "dynamic": "false",
    "properties": {
      "versions": {
        "properties": {
          "apostle_district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "date_ended": {
            "ignore_malformed": true,
            "format": "yyyy-MM-dd",
            "type": "date"
          },
          "unit_type_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "parent_path": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "sub_district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "unit_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "date_created": {
            "ignore_malformed": true,
            "format": "yyyy-MM-dd",
            "type": "date"
          },
          "bishop_district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "administration_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "date_opened": {
            "ignore_malformed": true,
            "format": "yyyy-MM-dd",
            "type": "date"
          },
          "district_church_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "district_apostle_helper_area_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "international_church_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "unit_type": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "district_apostle_area_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "unit_version": {
            "store": true,
            "type": "integer"
          },
          "client_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "regional_administration_center_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          }
        }
      },
      "unit_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      }
    }

Here is a minimal Set of Data from the Index

Unit

http://pastie.org/private/wippg2zfdrsiul5iaqiga

MemberMinistry:

http://pastie.org/private/4xrnyeygnac5abcnwskg

解决方案

I'm guessing that this is a bug.

When I took this part out:

        "total_active_ministers_by_unit": {
           "sum_bucket": {
              "buckets_path": "ministers>active_minister_by_ministry.value"
           }
        },

I got these results:

{
   "took": 5,
   "timed_out": false,
   "_shards": {
      "total": 1,
      "successful": 1,
      "failed": 0
   },
   "hits": {
      "total": 8,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "unit": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "3a7d74d8bfc6b841",
               "doc_count": 3,
               "ministers": {
                  "doc_count": 2,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 1,
                     "buckets": [
                        {
                           "key": 2,
                           "doc_count": 1,
                           "active_minister": {
                              "doc_count": 1
                           },
                           "ministry_type": {
                              "doc_count_error_upper_bound": 0,
                              "sum_other_doc_count": 0,
                              "buckets": [
                                 {
                                    "key": "District Evangelist",
                                    "doc_count": 1
                                 }
                              ]
                           }
                        }
                     ]
                  },
                  "active_minister_by_ministry": {
                     "value": 1
                  }
               }
            },
            {
               "key": "3a7d74e834bbc0d6",
               "doc_count": 2,
               "ministers": {
                  "doc_count": 1,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": [
                        {
                           "key": 2,
                           "doc_count": 1,
                           "active_minister": {
                              "doc_count": 1
                           },
                           "ministry_type": {
                              "doc_count_error_upper_bound": 0,
                              "sum_other_doc_count": 0,
                              "buckets": [
                                 {
                                    "key": "Apostle",
                                    "doc_count": 1
                                 }
                              ]
                           }
                        }
                     ]
                  },
                  "active_minister_by_ministry": {
                     "value": 1
                  }
               }
            },
            {
               "key": "3a7b3abb09bfec65",
               "doc_count": 1,
               "ministers": {
                  "doc_count": 0,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": []
                  },
                  "active_minister_by_ministry": {
                     "value": 0
                  }
               }
            },
            {
               "key": "3a7d707e45c8dc24",
               "doc_count": 1,
               "ministers": {
                  "doc_count": 0,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": []
                  },
                  "active_minister_by_ministry": {
                     "value": 0
                  }
               }
            },
            {
               "key": "3a7d70975c84428c",
               "doc_count": 1,
               "ministers": {
                  "doc_count": 0,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": []
                  },
                  "active_minister_by_ministry": {
                     "value": 0
                  }
               }
            }
         ]
      }
   }
}

But trying to pipeline across the parent/child relationship causes an error.

This probably ought to be reported as a bug, unless I'm missing something.

Here is the code I used to test it out:

http://sense.qbox.io/gist/075732dfe51916538ff13ea76a2cec1f7532c691

这篇关于弹性搜索通过儿童聚合流水线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆