ElasticSearch 映射折叠/对分组文档执行操作的结果 [英] ElasticSearch mapping the result of collapse / do operations on a grouped documents

查看:19
本文介绍了ElasticSearch 映射折叠/对分组文档执行操作的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个对话列表,每个对话都有一个消息列表.每条消息都有不同的字段和一个 action 字段.我们需要考虑到在对话的第一条消息中使用了动作 A,在几条消息之后使用了动作 A.1 和一段时间后的 >A.1.1 等等(有一个聊天机器人意图列表).

There is a list of conversations and every conversation has a list of messages. Every message has different fields and an action field. We need to consider that in the first messages of the conversation there is used the action A, after a few messages there is used action A.1 and after a while A.1.1 and so on (there is a list of chatbot intents).

对对话的消息操作进行分组将类似于:A >>>A.1 >>A.1 >A.1.1 ...

Grouping the messages actions of a conversation will be something like: A > A > A > A.1 > A > A.1 > A.1.1 ...

问题:

我需要使用 ElasticSearch 创建一个报告,该报告将返回每个对话的 actions group;接下来,我需要对相似的 actions groups 添加一个计数;最终将导致 Map'A >A.1 >>A.1 >A.1.1', 3.

I need to create a report using ElasticSearch that will return the actions group of every conversation; next, I need to group the similar actions groups adding a count; in the end will result in a Map<actionsGroup, count> as 'A > A.1 > A > A.1 > A.1.1', 3.

构建actions group 我需要消除每一组重复项;而不是 A >>>A.1 >>A.1 >A.1.1 我需要 A >A.1 >>A.1 >A.1.1.

Constructing the actions group I need to eliminate every group of duplicates; Instead of A > A > A > A.1 > A > A.1 > A.1.1 I need to have A > A.1 > A > A.1 > A.1.1.

我开始做的步骤:

{
   "collapse":{
      "field":"context.conversationId",
      "inner_hits":{
         "name":"logs",
         "size": 10000,
         "sort":[
            {
               "@timestamp":"asc"
            }
         ]
      }
   },
   "aggs":{
   },
}

接下来我需要什么:

  1. 我需要在单个结果中映射崩溃的结果,例如 A >A.1 >>A.1 >A.1.1.我已经看到在这种情况下或 aggr 可以使用 scripts 在结果上,可以创建我需要的操作列表,但是 aggr正在对所有消息进行操作,而不仅仅是对我折叠的分组消息进行操作.是否可以在collapse 内使用aggr 或类似的解决方案?
  2. 我需要对所有折叠的结果值(A > A.1 > A > A.1 > A.1.1)进行分组,添加一个计数并导致 Map.
  1. I need to map the result of the collapse in a single result like A > A.1 > A > A.1 > A.1.1. I've seen that in the case or aggr is possible to use scripts over the result and there is possible to create a list of actions like I need to have, but aggr is doing the operations over all messages, not only over the grouped messages that I have in collapse. It is there possible to use aggr inside collapse or a similar solution?
  2. I need to group the resulted values(A > A.1 > A > A.1 > A.1.1) from all collapses, adding a count and resulting in the Map<actionsGroup, count>.

或者:

  1. 使用 aggrconversationId 字段对对话消息进行分组(我不知道该怎么做)
  2. 使用脚本迭代所有值并为每个对话创建actions group.(不确定这是否可能)
  3. 对所有值使用另一个 aggr 并对重复项进行分组,返回 Map.
  1. Group the conversations messages by conversationId field using aggr (I don't know how can I do this)
  2. Use script to iterate all values and create the actions group for every conversation. (not sure if this is possible)
  3. Use another aggr over all values and group the duplicates, returning Map<actionsGroup, count>.

更新 2: 我设法获得了部分结果,但仍然存在一个问题.请在此处查看我还需要解决的问题.

Update 2: I managed to have a partial result but still remaining one issue. Please check here what I still need to fix.

更新 1:添加一些额外的细节

映射:

"mappings":{
  "properties":{
     "@timestamp":{
        "type":"date",
        "format": "epoch_millis"
     }
     "context":{
        "properties":{
           "action":{
              "type":"keyword"
           },
           "conversationId":{
              "type":"keyword"
           }
        }
     }
  }
}

对话的示例文档:

Conversation 1.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id1",
    }
}

Conversation 2.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id2",
    }
}

Conversation 3.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "B",
        "conversationId": "conv_id3",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "B.1",
        "conversationId": "conv_id3",
    }
}

预期结果:

{
    "A -> A.1 -> A.1.1": 2,
    "B -> B.1": 1
}
Something similar, having this or any other format.

由于我是 Elasticsearch 的新手,所以非常欢迎每个提示.

推荐答案

我使用 elastic 的 scripted_metric 解决了这个问题.此外,index 已从初始状态更改.

I solved it using the scripted_metric of elastic. Also, the index was changed from the initial state.

脚本:

{
   "size": 0,
   "aggs": {
        "intentPathsCountAgg": {
            "scripted_metric": {
                "init_script": "state.messagesList = new ArrayList();",
                "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
                "combine_script": "return state",
                "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
            }
        }
    }
}

格式化脚本(为了更好的可读性 - 使用 .ts):

Formatted script (for better readability - using .ts):

scripted_metric: {
  init_script: 'state.messagesList = new ArrayList();',
  map_script: `
    long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
    Map currentMessage = [
      'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
      'time': currentMessageTime,
      'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
    ];
    state.messagesList.add(currentMessage);`,
  combine_script: 'return state',
  reduce_script: `
    List messages = new ArrayList();
    Map conversationsMap = new HashMap();
    Map intentsMap = new HashMap();
    boolean[] ifElseWorkaround = new boolean[1];

    for (state in states) {
      messages.addAll(state.messagesList);
    }

    messages.stream().forEach(message -> {
      Map existingMessage = conversationsMap.get(message.conversationId);
      if(existingMessage == null || message.time > existingMessage.time) {
        conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
      } else {
        ifElseWorkaround[0] = true;
      }
    });

    conversationsMap.entrySet().forEach(conversation -> {
      if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
        long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
        intentsMap.put(conversation.getValue().intentsPath, intentsCount);
      } else {
        intentsMap.put(conversation.getValue().intentsPath, 1L);
      }
    });

    return intentsMap.entrySet().stream().map(intentPath -> [
      'path': intentPath.getKey().toString(),
      'count': intentPath.getValue()
    ]).collect(Collectors.toSet())`

答案:

{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 11,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "intentPathsCountAgg": {
            "value": [
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
                },
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
                },
                {
                    "smallTalk.greet -> smallTalk.greet2": 1
                }
            ]
        }
    }
}

这篇关于ElasticSearch 映射折叠/对分组文档执行操作的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆