ElasticSearch映射对分组文档进行折叠/执行操作的结果 [英] ElasticSearch mapping the result of collapse / do operations on a grouped documents
问题描述
有一个对话列表,每个对话都有一个消息列表。每个邮件都有不同的字段和 action
字段。我们需要考虑的是,在对话的第一条消息中使用了操作 A
,在一些消息之后,使用了操作 A.1
以及稍后的 A.1.1
等(有一个聊天机器人意图列表)。
There is a list of conversations and every conversation has a list of messages. Every message has different fields and an action
field. We need to consider that in the first messages of the conversation there is used the action A
, after a few messages there is used action A.1
and after a while A.1.1
and so on (there is a list of chatbot intents).
将对话的消息操作分组将类似于: A> > > A.1> > A.1> A.1.1 ...
Grouping the messages actions of a conversation will be something like: A > A > A > A.1 > A > A.1 > A.1.1 ...
问题:
我需要使用ElasticSearch创建一个报告,该报告将返回每个会话的 actions组
;接下来,我需要对类似的 actions组
进行分组,并添加一个计数;最后会导致 Map< actionsGroup,count>
为’A> A.1> > A.1> A.1.1',3
。
I need to create a report using ElasticSearch that will return the actions group
of every conversation; next, I need to group the similar actions groups
adding a count; in the end will result in a Map<actionsGroup, count>
as 'A > A.1 > A > A.1 > A.1.1', 3
.
构造 actions组
每组重复项;代替 A> > > A.1> > A.1> A.1.1
我需要拥有 A> A.1> > A.1> A.1.1
。
Constructing the actions group
I need to eliminate every group of duplicates; Instead of A > A > A > A.1 > A > A.1 > A.1.1
I need to have A > A.1 > A > A.1 > A.1.1
.
我开始做的步骤:
{
"collapse":{
"field":"context.conversationId",
"inner_hits":{
"name":"logs",
"size": 10000,
"sort":[
{
"@timestamp":"asc"
}
]
}
},
"aggs":{
},
}
我接下来需要做什么:
- 我需要将崩溃的结果映射到单个结果中,例如
A> A.1> > A.1> A.1.1
。我已经看到,在这种情况下,或者aggr
可以使用脚本,并且可以创建类似我需要执行的操作的列表,但是aggr
正在对所有邮件进行操作,不仅对我折叠中的分组邮件进行操作。是否可以在崩溃或类似的解决方案中使用aggr
? - 我需要对结果值进行分组(
A> A.1> A> A.1> A.1.1
),添加一个计数并生成Map< actionsGroup, count>
。
- I need to map the result of the collapse in a single result like
A > A.1 > A > A.1 > A.1.1
. I've seen that in the case oraggr
is possible to use scripts over the result and there is possible to create a list of actions like I need to have, butaggr
is doing the operations over all messages, not only over the grouped messages that I have in collapse. It is there possible to useaggr
inside collapse or a similar solution? - I need to group the resulted values(
A > A.1 > A > A.1 > A.1.1
) from all collapses, adding a count and resulting in theMap<actionsGroup, count>
.
或:
- 使用
aggr
通过conversationId
字段将对话消息分组我不知道该怎么做) - 使用脚本来迭代所有值并为每个对话创建
actions组
。 (不确定是否可行) - 在所有值上使用另一个
aggr
并将重复项分组,返回Map< actionsGroup,count>
。
- Group the conversations messages by
conversationId
field usingaggr
(I don't know how can I do this) - Use script to iterate all values and create the
actions group
for every conversation. (not sure if this is possible) - Use another
aggr
over all values and group the duplicates, returningMap<actionsGroup, count>
.
更新2:我设法取得了部分结果,但仍然有一个问题。请在此处进行检查。
Update 2: I managed to have a partial result but still remaining one issue. Please check here what I still need to fix.
更新1:添加一些额外的详细信息
Update 1: adding some extra details
映射:
"mappings":{
"properties":{
"@timestamp":{
"type":"date",
"format": "epoch_millis"
}
"context":{
"properties":{
"action":{
"type":"keyword"
},
"conversationId":{
"type":"keyword"
}
}
}
}
}
对话文档的样本:
Conversation 1.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id1",
}
}
Conversation 2.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id2",
}
}
Conversation 3.
{
"@timestamp": 1579632745000,
"context": {
"action": "B",
"conversationId": "conv_id3",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "B.1",
"conversationId": "conv_id3",
}
}
预期结果:
{
"A -> A.1 -> A.1.1": 2,
"B -> B.1": 1
}
Something similar, having this or any other format.
因为我是Elasticsearch的新手,所以每个提示都值得欢迎。
推荐答案
我使用弹性的 scripted_metric
解决了这个问题,而且 index
从初始状态更改了。
I solved it using the scripted_metric
of elastic. Also, the index
was changed from the initial state.
脚本:
{
"size": 0,
"aggs": {
"intentPathsCountAgg": {
"scripted_metric": {
"init_script": "state.messagesList = new ArrayList();",
"map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",
"combine_script": "return state",
"reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
}
}
}
}
格式化脚本(以提高可读性-使用.ts):
Formatted script (for better readability - using .ts):
scripted_metric: {
init_script: 'state.messagesList = new ArrayList();',
map_script: `
long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
Map currentMessage = [
'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
'time': currentMessageTime,
'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
];
state.messagesList.add(currentMessage);`,
combine_script: 'return state',
reduce_script: `
List messages = new ArrayList();
Map conversationsMap = new HashMap();
Map intentsMap = new HashMap();
boolean[] ifElseWorkaround = new boolean[1];
for (state in states) {
messages.addAll(state.messagesList);
}
messages.stream().forEach(message -> {
Map existingMessage = conversationsMap.get(message.conversationId);
if(existingMessage == null || message.time > existingMessage.time) {
conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
} else {
ifElseWorkaround[0] = true;
}
});
conversationsMap.entrySet().forEach(conversation -> {
if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
intentsMap.put(conversation.getValue().intentsPath, intentsCount);
} else {
intentsMap.put(conversation.getValue().intentsPath, 1L);
}
});
return intentsMap.entrySet().stream().map(intentPath -> [
'path': intentPath.getKey().toString(),
'count': intentPath.getValue()
]).collect(Collectors.toSet())`
答案:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 11,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"intentPathsCountAgg": {
"value": [
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
},
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3 -> smallTalk.greet4": 1
},
{
"smallTalk.greet -> smallTalk.greet2": 1
}
]
}
}
}
这篇关于ElasticSearch映射对分组文档进行折叠/执行操作的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!