数据流:在事件流中查找上一个事件 [英] Dataflow: Look up a previous event in an event stream

查看:21
本文介绍了数据流:在事件流中查找上一个事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Google Dataflow 中恢复我正在寻找的与 Apache Beam 相关的内容类似于 LAG 在 Azure 流分析

Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics

使用 X 分钟的窗口接收数据:

Using a window of X minutes where I'm receiving data:

||||||  ||||||  ||||||  ||||||  ||||||  ||||||
|  1 |  |  2 |  |  3 |  |  4 |  |  5 |  |  6 | 
|id=x|  |id=x|  |id=x|  |id=x|  |id=x|  |id=x| 
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...

我需要比较数据(n)和数据(n-1),例如,按照前面的例子,它会是这样的:

I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:

if data(6) inside and data(5)  outside then ... 
if data(5) inside and data(4)  outside then ... 
if data(4) inside and data(3)  outside then ... 
if data(3) inside and data(2)  outside then ... 
if data(2) inside and data(1)  outside then ... 

有什么实用的"方法可以做到这一点?

Is there any "practical "way to do this?

推荐答案

使用 Beam,如 docs,按键和窗口维护状态.因此,您无法访问先前窗口中的值.

With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.

要完成您想做的事情,您可能需要更复杂的管道设计.我的想法,在这里作为一个例子,是在 ParDo 中复制你的消息:

To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:

  • 将它们未经修改地发送到主输出
  • 同时,将它们发送到具有单窗口延迟的侧输出

为了做第二个要点,我们可以将窗口的持续时间(WINDOW_SECONDS)添加到元素时间戳:

To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:

class DuplicateWithLagDoFn(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    # Main output gets unmodified element
    yield element
    # The same element is emitted to the side output with a 1-window lag added to timestamp
    yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))

我们调用指定正确标签的函数:

We call the function specifying the correct tags:

beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')

然后对两者应用相同的窗口方案,通过键组合等.

and then apply the same windowing scheme to both, co-group by key, etc.

windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))

merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()

最后,我们可以在同一个 ParDo 中同时拥有两个值(旧的和新的):

Finally, we can have both values (old and new) inside the same ParDo:

class CompareDoFn(beam.DoFn):

  def process(self, element):
    logging.info("Combined with previous vale: {}".format(element))

    try:
      old_value = int(element[1][1][0].split(',')[1])
    except:
      old_value = 0

    try:
      new_value = int(element[1][0][0].split(',')[1])
    except:
      new_value = 0

    logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
    return (element[0], new_value - old_value)

为了测试这一点,我使用直接运行器运行管道,并在单独的 shell 上发布两条间隔超过 10 秒的消息(在我的情况下,WINDOW_SECONDS 是 10 秒):

To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):

gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"

并且作业输出显示了预期的差异:

And the job output shows the expected difference:

INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40

我的示例的完整代码此处.在复制元素时要考虑性能因素,但如果您需要在两个窗口期间提供可用值,这是有道理的.

Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.

这篇关于数据流:在事件流中查找上一个事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆