带有Python的Apache Beam中的会话窗口 [英] Session windows in Apache Beam with python

查看:60
本文介绍了带有Python的Apache Beam中的会话窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一系列的用户事件.我已经将它们映射到KV {userId,event},并分配了时间戳.

I have a stream of user events. I've mapped them into KV{ userId, event }, and assigned timestamps.

这将以流模式运行.我希望能够创建以下输入输出结果:

This is to run in streaming mode. I would like to have be able to create the following input-output result:

会话窗口间隔= 1

  • 输入:user=1, timestamp=1, event=a
  • 输入:user=2, timestamp=2, event=a
  • 输入:user=2, timestamp=3, event=a
  • 输入:user=1, timestamp=2, event=b
  • 时间:lwm=3
  • 输出:user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • 时间:lwm=4
  • 输出:user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]
  • input: user=1, timestamp=1, event=a
  • input: user=2, timestamp=2, event=a
  • input: user=2, timestamp=3, event=a
  • input: user=1, timestamp=2, event=b
  • time: lwm=3
  • output: user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • time: lwm=4
  • output: user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]

这样我就可以编写函数来减少用户在会话窗口中的事件列表以及会话窗口的开始和结束时间.

So that I can write my function to reduce thee list of events in the session window for the user as well as the start and end time of the session window.

我该怎么写? (如果回答,请查看示例",这不是一个有效的答案,因为它们从不将事件列表以窗口作为参数输入到reducer中)

How do I write this? (If you answer; "look at the examples", it's not a valid answer, because they never feed the list of events into the reducer with the window as a parameter)

推荐答案

如果我理解正确,这将是此

If I understand this correctly, this would be a follow-up to this question and naturally accomplished by adding the Group By Key step as I propose in my solution there.

因此,请参阅我以前的解释,如果我们有这样的管道,则仅关注更改:

So, referring to my previous explanation and focusing here on the changes only, if we have a pipeline like this:

events = (p
  | 'Create Events' >> beam.Create(user1_data + user2_data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['user_id'], x))
  | 'user_session_window'   >> beam.WindowInto(window.Sessions(session_gap),
                                             timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
  | 'Group' >> beam.GroupByKey() \
  | 'analyze_session'         >> beam.ParDo(AnalyzeSession()))

现在,元素按照您在问题描述中的描述进行排列,因此我们只需将它们记录在AnalyzeSession中:

Now the elements are arranged as you describe in the question description so we can simply log them in AnalyzeSession:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    logging.info(element)
    yield element

获得所需的结果:

INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])

如果要避免冗余信息,例如将user_idtimestamp作为值的一部分,则可以在Map步骤中将其删除. 根据完整的用例(即减少每个会话级别的汇总事件),我们可以执行类似以下操作的事情:计算事件数或会话持续时间:

If you want to avoid redundant information such as having the user_id and timestamp as part of the values they can be removed in the Map step. As per the complete use case (i.e. reducing the aggregated events on a per-session level) we can do stuff like counting the number of events or session duration with something like this:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    user = element[0]
    num_events = str(len(element[1]))
    window_end = window.end.to_utc_datetime()
    window_start = window.start.to_utc_datetime()
    session_duration = window_end - window_start

    logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)

    yield element

对于我的示例,将输出以下内容:

which, for my example, will output the following:

INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session

完整代码此处

这篇关于带有Python的Apache Beam中的会话窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆