Apache Beam 中的会话窗口与 python [英] Session windows in Apache Beam with python

查看:32
本文介绍了Apache Beam 中的会话窗口与 python的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用户事件流.我已将它们映射到 KV{ userId, event } 并分配了时间戳.

I have a stream of user events. I've mapped them into KV{ userId, event }, and assigned timestamps.

这是在流模式下运行.我希望能够创建以下输入输出结果:

This is to run in streaming mode. I would like to have be able to create the following input-output result:

会话窗口间隙=1

  • 输入:user=1,timestamp=1,event=a
  • 输入:user=2,timestamp=2,event=a
  • 输入:user=2,timestamp=3,event=a
  • 输入:user=1,timestamp=2,event=b
  • 时间:lwm=3
  • 输出:user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • 时间:lwm=4
  • 输出:user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]

这样我就可以编写我的函数来减少用户会话窗口中的事件列表以及会话窗口的开始和结束时间.

So that I can write my function to reduce thee list of events in the session window for the user as well as the start and end time of the session window.

我该怎么写?(如果你回答;看看例子",这不是一个有效的答案,因为他们从来没有将事件列表以窗口作为参数输入到减速器中)

How do I write this? (If you answer; "look at the examples", it's not a valid answer, because they never feed the list of events into the reducer with the window as a parameter)

推荐答案

如果我理解正确,这将是此问题 并且通过添加我在那里的解决方案中提出的按关键分组步骤自然地完成.

If I understand this correctly, this would be a follow-up to this question and naturally accomplished by adding the Group By Key step as I propose in my solution there.

因此,如果我们有这样的管道,请参考我之前的解释并在此仅关注更改:

So, referring to my previous explanation and focusing here on the changes only, if we have a pipeline like this:

events = (p
  | 'Create Events' >> beam.Create(user1_data + user2_data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['user_id'], x))
  | 'user_session_window'   >> beam.WindowInto(window.Sessions(session_gap),
                                             timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
  | 'Group' >> beam.GroupByKey() \
  | 'analyze_session'         >> beam.ParDo(AnalyzeSession()))

现在元素按照您在问题描述中的描述进行排列,因此我们可以简单地将它们记录在 AnalyzeSession 中:

Now the elements are arranged as you describe in the question description so we can simply log them in AnalyzeSession:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    logging.info(element)
    yield element

获得想要的结果:

INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])

如果您想避免冗余信息,例如将 user_idtimestamp 作为值的一部分,可以在 Map 中删除它们步.根据完整的用例(即减少每个会话级别的聚合事件),我们可以执行以下操作,例如计算事件数量或会话持续时间:

If you want to avoid redundant information such as having the user_id and timestamp as part of the values they can be removed in the Map step. As per the complete use case (i.e. reducing the aggregated events on a per-session level) we can do stuff like counting the number of events or session duration with something like this:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    user = element[0]
    num_events = str(len(element[1]))
    window_end = window.end.to_utc_datetime()
    window_start = window.start.to_utc_datetime()
    session_duration = window_end - window_start

    logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)

    yield element

对于我的示例,将输出以下内容:

which, for my example, will output the following:

INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session

完整代码这里

这篇关于Apache Beam 中的会话窗口与 python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆