ElasticSearch映射对分组文档进行折叠/执行操作的结果 [英] ElasticSearch mapping the result of collapse / do operations on a grouped documents

查看:77
本文介绍了ElasticSearch映射对分组文档进行折叠/执行操作的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个对话列表,每个对话都有一个消息列表。每个邮件都有不同的字段和 action 字段。我们需要考虑的是,在对话的第一条消息中使用了操作 A ,在一些消息之后,使用了操作 A.1 以及稍后的 A.1.1 等(有一个聊天机器人意图列表)。

There is a list of conversations and every conversation has a list of messages. Every message has different fields and an action field. We need to consider that in the first messages of the conversation there is used the action A, after a few messages there is used action A.1 and after a while A.1.1 and so on (there is a list of chatbot intents).

将对话的消息操作分组将类似于: A> > > A.1> > A.1> A.1.1 ...

Grouping the messages actions of a conversation will be something like: A > A > A > A.1 > A > A.1 > A.1.1 ...

问题:

我需要使用ElasticSearch创建一个报告,该报告将返回每个会话的 actions组;接下来,我需要对类似的 actions组进行分组,并添加一个计数;最后会导致 Map< actionsGroup,count> ’A> A.1> > A.1> A.1.1',3

I need to create a report using ElasticSearch that will return the actions group of every conversation; next, I need to group the similar actions groups adding a count; in the end will result in a Map<actionsGroup, count> as 'A > A.1 > A > A.1 > A.1.1', 3.

构造 actions组每组重复项;代替 A> > > A.1> > A.1> A.1.1 我需要拥有 A> A.1> > A.1> A.1.1

Constructing the actions group I need to eliminate every group of duplicates; Instead of A > A > A > A.1 > A > A.1 > A.1.1 I need to have A > A.1 > A > A.1 > A.1.1.

我开始做的步骤

{
   "collapse":{
      "field":"context.conversationId",
      "inner_hits":{
         "name":"logs",
         "size": 10000,
         "sort":[
            {
               "@timestamp":"asc"
            }
         ]
      }
   },
   "aggs":{
   },
}

我接下来需要做什么:


  1. 我需要将崩溃的结果映射到单个结果中,例如 A> A.1> > A.1> A.1.1 。我已经看到,在这种情况下,或者 aggr 可以使用脚本,并且可以创建类似我需要执行的操作的列表,但是 aggr 正在对所有邮件进行操作,不仅对我折叠中的分组邮件进行操作。是否可以在崩溃或类似的解决方案中使用 aggr

  2. 我需要对结果值进行分组( A> A.1> A> A.1> A.1.1 ),添加一个计数并生成 Map< actionsGroup, count>

  1. I need to map the result of the collapse in a single result like A > A.1 > A > A.1 > A.1.1. I've seen that in the case or aggr is possible to use scripts over the result and there is possible to create a list of actions like I need to have, but aggr is doing the operations over all messages, not only over the grouped messages that I have in collapse. It is there possible to use aggr inside collapse or a similar solution?
  2. I need to group the resulted values(A > A.1 > A > A.1 > A.1.1) from all collapses, adding a count and resulting in the Map<actionsGroup, count>.

或:


  1. 使用 aggr 通过 conversationId 字段将对话消息分组我不知道该怎么做)

  2. 使用脚本来迭代所有值并为每个对话创建 actions组。 (不确定是否可行)

  3. 在所有值上使用另一个 aggr 并将重复项分组,返回 Map< actionsGroup,count>

  1. Group the conversations messages by conversationId field using aggr (I don't know how can I do this)
  2. Use script to iterate all values and create the actions group for every conversation. (not sure if this is possible)
  3. Use another aggr over all values and group the duplicates, returning Map<actionsGroup, count>.

更新2:我设法取得了部分结果,但仍然有一个问题。请在此处进行检查。

Update 2: I managed to have a partial result but still remaining one issue. Please check here what I still need to fix.

更新1:添加一些额外的详细信息

Update 1: adding some extra details

映射:

"mappings":{
  "properties":{
     "@timestamp":{
        "type":"date",
        "format": "epoch_millis"
     }
     "context":{
        "properties":{
           "action":{
              "type":"keyword"
           },
           "conversationId":{
              "type":"keyword"
           }
        }
     }
  }
}

对话文档的样本:

Conversation 1.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id1",
    }
}

Conversation 2.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id2",
    }
}

Conversation 3.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "B",
        "conversationId": "conv_id3",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "B.1",
        "conversationId": "conv_id3",
    }
}

预期结果:

{
    "A -> A.1 -> A.1.1": 2,
    "B -> B.1": 1
}
Something similar, having this or any other format.

因为我是Elasticsearch的新手,所以每个提示都值得欢迎。

推荐答案

我使用弹性的 scripted_metric 解决了这个问题,而且 index 从初始状态更改了。

I solved it using the scripted_metric of elastic. Also, the index was changed from the initial state.

脚本:

{
   "size": 0,
   "aggs": {
        "intentPathsCountAgg": {
            "scripted_metric": {
                "init_script": "state.messagesList = new ArrayList();",
                "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
                "combine_script": "return state",
                "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
            }
        }
    }
}

格式化脚本(以提高可读性-使用.ts):

Formatted script (for better readability - using .ts):

scripted_metric: {
  init_script: 'state.messagesList = new ArrayList();',
  map_script: `
    long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
    Map currentMessage = [
      'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
      'time': currentMessageTime,
      'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
    ];
    state.messagesList.add(currentMessage);`,
  combine_script: 'return state',
  reduce_script: `
    List messages = new ArrayList();
    Map conversationsMap = new HashMap();
    Map intentsMap = new HashMap();
    boolean[] ifElseWorkaround = new boolean[1];

    for (state in states) {
      messages.addAll(state.messagesList);
    }

    messages.stream().forEach(message -> {
      Map existingMessage = conversationsMap.get(message.conversationId);
      if(existingMessage == null || message.time > existingMessage.time) {
        conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
      } else {
        ifElseWorkaround[0] = true;
      }
    });

    conversationsMap.entrySet().forEach(conversation -> {
      if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
        long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
        intentsMap.put(conversation.getValue().intentsPath, intentsCount);
      } else {
        intentsMap.put(conversation.getValue().intentsPath, 1L);
      }
    });

    return intentsMap.entrySet().stream().map(intentPath -> [
      'path': intentPath.getKey().toString(),
      'count': intentPath.getValue()
    ]).collect(Collectors.toSet())`

答案:

{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 11,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "intentPathsCountAgg": {
            "value": [
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
                },
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
                },
                {
                    "smallTalk.greet -> smallTalk.greet2": 1
                }
            ]
        }
    }
}

这篇关于ElasticSearch映射对分组文档进行折叠/执行操作的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆