来自具有MongoDB聚合的事件集合的线性漏斗,有可能吗? [英] Linear funnel from a collection of events with MongoDB aggregation, is it possible?
问题描述
我有很多事件文档,每个事件都有很多字段,但是与我的查询相关的字段是:
I have a number of event documents, each event has a number of fields, but the ones that are relevant for my query are:
- person_id-对触发事件的人的引用
- event-标识事件的字符串键
- occurred_at-事件发生时间的utc
我想要实现的是:
- 获取事件键列表,例如"['event_1','event_2','event_3']
- 按顺序获取执行每个事件以及该事件之前的所有事件的人数的计数,即:
- 执行活动_1的人数
- 执行事件_1,然后执行事件_2的人数
- 执行事件_1,然后执行事件_2,然后执行事件_3的人数
- 等
- for a list of event keys eg `['event_1','event_2', 'event_3']
- get counts of the number of people that performed each event and all the event previous to that event, in order, ie:
- the number of people who performed event_1
- the number of people who performed event_1, and then event_2
- the number of people who performed event_1, and then event_2, and then event_3
- etc
我得到的最好的是下面的两个图简化:
The best I have got is the following two map reduces:
db.events.mapReduce(function () { emit(this.person_id, { e: [{ e: this.event, o: this.occurred_at }] }) }, function (key, values) { return { e: [].concat.apply([], values.map(function (x) { return x.e })) } }, { query: { account_id: ObjectId('52011239b1b9229f92000003'), event: { $in: ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'] } }, out: 'people_funnel_chains', sort: { person_id: 1, occurred_at: 1 } })
然后:
db.people_funnel_chains.mapReduce(function() { funnel = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'] events = this.value.e; for (var e in funnel) { e = funnel[e]; if ((i = events.map(function (x) { return x.e }).indexOf(e)) > -1) { emit(e, { c: 1, o: events[i].o }) events = events.slice(i + 1, events.length); } else { break; } } }, function(key,values) { return { c: Array.sum(values.map(function(x) { return x.c })), o: new Date(Array.sum(values.map(function(x) { return x.o.getTime() }))/values.length) }; }, { out: {inline: 1} })
我想使用聚合框架实时实现这一目标,但看不到有办法实现.对于成千上万的记录,这需要10秒钟的时间,我可以递增地运行它,这意味着它足够快地接收新数据,但是如果我想修改原始查询(例如,更改事件链),则无法完成在一个我希望它能够完成的请求中.
I would like to achieve this is in real time using the aggregate framework but can see no way to do it. For 10s of thousands of records this is taking 10s of seconds, I can run it incrementally which means its fast enough for new data coming in but if I want to modify the original query (eg change the event chain) it can't be done in a single request which I would love it to be able to do.
使用Cursor.forEach()我已经在此方面取得了巨大的进步(基本上消除了第一个map reduce的要求).
Using Cursor.forEach() I've managed to get huge improvement on this (essentially removing the requirement for the first map reduce).
var time = new Date().getTime(), funnel_event_keys = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'], looking_for_i = 0, looking_for = funnel_event_keys[0], funnel = {}, last_person_id = null; for (var i in funnel_event_keys) { funnel[funnel_event_keys[i]] = [0,null] }; db.events.find({ account_id: ObjectId('52011239b1b9229f92000003'), event: { $in: funnel_event_keys } }, { person_id: 1, event: 1, occurred_at: 1 }).sort({ person_id: 1, occurred_at: 1 }).forEach(function(e) { var current_person_id = e['person_id'].str; if (last_person_id != current_person_id) { looking_for_i = 0; looking_for = funnel_event_keys[0] } if (e['event'] == looking_for) { var funnel_event = funnel[looking_for] funnel_event[0] = funnel_event[0] + 1; funnel_event[1] = ((funnel_event[1] || e['occurred_at'].getTime()) + e['occurred_at'].getTime())/2; looking_for_i = looking_for_i + 1; looking_for = funnel_event_keys[looking_for_i] } last_person_id = current_person_id; }) funnel; new Date().getTime() - time;
我想知道在内存中使用数据自定义的东西是否可以对此进行改进?将MongoDB中的数十万条记录存入内存(在另一台计算机上)将成为瓶颈,难道有我不知道的技术可以做到这一点吗?
I wonder if something custom with data in memory would be able to improve on this? Getting 100s of thousands of records out of MongoDB into memory (on a different machine) is going to be a bottle neck, is there a technology I'm not aware of that could do this?
推荐答案
I wrote up a complete answer on my MongoDB blog but as a summary, what you have to do is project your actions based on which ones you care about to map values of action field into appropriate key names, group by person aggregating for the three actions when they did them (and optionally how many times) and then project new fields which check if action2 was done after action1, and action3 was done after action2... Last phase just sums up the number of people who did just 1, or 1 and then 2, or 1 and then 2 and then 3.
使用函数生成聚合管道,可以根据传入的操作数组生成结果.
Using a function to generate the aggregation pipeline, it's possible to generate results based on array of actions passed in.
在我的测试用例中,整个管道在200毫秒内运行,收集了40,000个文档(这是在我的小型笔记本电脑上).
In my test case, the entire pipeline ran in under 200ms for a collection of 40,000 documents (this was on my small laptop).
正如正确指出的那样,我所描述的一般解决方案假定一个参与者可以执行多次动作,而他们只能从动作1前进到动作2,但不能直接从动作1跳到动作3(将动作顺序解释为描述必须先完成action2)才能执行action3的先决条件.
As it was correctly pointed out, the general solution I describe assumes that while an actor can take any action multiple times that they can only advance from action1 to action2 but that they cannot skip directly from action1 to action3 (interpreting action order as describing prerequisites where you cannot do action3 until you've done action2).
事实证明,聚合框架甚至可以用于顺序完全任意的事件序列,但是您仍然想知道在某个时候有多少人执行了action1,action2,action3序列.
As it turns out, aggregation framework can be used even for sequences of events where the order is completely arbitrary but you still want to know how many people at some point did the sequence action1, action2, action3.
要对原始答案进行的主要调整是在中间添加一个额外的两阶段步骤.此步骤将展开收集的人员文档,以重新分组,以找到第二个操作的第一个出现,而第二个操作在第一个操作的第一次出现之后出现.
The main adjustment to make on the original answer is to add an extra two-stage step in the middle. This step unwinds the collected by person document to re-group it finding the first occurrence of the second action that comes after the first occurrence of the first action.
一旦我们有了对动作1的最终比较,然后是最早发生的动作2,然后将其与最近发生的动作3进行比较.
Once we have that the final comparison becomes for action1, followed by earliest occurrence of action2 and compare that to the latest occurrence of action3.
可能可以将其概括为处理任意数量的事件,但是超过两个的其他事件将为聚合增加两个阶段.
It can probably be generalized to handle arbitrary number of events but every additional event past two would add two more stages to the aggregation.
这是我的文章修改管道以实现您正在寻找的答案.
Here is my write-up of the modification of the pipeline to achieve the answer you are looking for.
这篇关于来自具有MongoDB聚合的事件集合的线性漏斗,有可能吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!