数据流:在事件流中查找上一个事件 [英] Dataflow: Look up a previous event in an event stream

查看:100
本文介绍了数据流:在事件流中查找上一个事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

恢复我正在寻找的与Google Dataflow中的Apache Beam有关的内容,例如

Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics

使用X分钟的窗口接收数据:

Using a window of X minutes where I'm receiving data:

||||||  ||||||  ||||||  ||||||  ||||||  ||||||
|  1 |  |  2 |  |  3 |  |  4 |  |  5 |  |  6 | 
|id=x|  |id=x|  |id=x|  |id=x|  |id=x|  |id=x| 
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...

我需要将data(n)与data(n-1)进行比较,例如,下面的示例将是这样的:

I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:

if data(6) inside and data(5)  outside then ... 
if data(5) inside and data(4)  outside then ... 
if data(4) inside and data(3)  outside then ... 
if data(3) inside and data(2)  outside then ... 
if data(2) inside and data(1)  outside then ... 

是否有任何实用"的方法?

Is there any "practical "way to do this?

推荐答案

使用Beam,如

With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.

要执行您想做的事情,您可能需要更复杂的管道设计.我的想法(以此处的示例为例)是在ParDo中复制您的消息:

To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:

  • 将它们未经修改地发送到主输出中
  • 同时,将它们发送到带有单窗口滞后的侧面输出

要完成第二个要点,我们可以将窗口的持续时间(WINDOW_SECONDS)添加到元素时间戳:

To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:

class DuplicateWithLagDoFn(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    # Main output gets unmodified element
    yield element
    # The same element is emitted to the side output with a 1-window lag added to timestamp
    yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))

我们调用指定正确标签的函数:

We call the function specifying the correct tags:

beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')

,然后将相同的窗口化方案应用于这两个窗口,并按键共同分组,等等.

and then apply the same windowing scheme to both, co-group by key, etc.

windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))

merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()

最后,我们可以在同一个ParDo中同时拥有两个值(旧值和新值):

Finally, we can have both values (old and new) inside the same ParDo:

class CompareDoFn(beam.DoFn):

  def process(self, element):
    logging.info("Combined with previous vale: {}".format(element))

    try:
      old_value = int(element[1][1][0].split(',')[1])
    except:
      old_value = 0

    try:
      new_value = int(element[1][0][0].split(',')[1])
    except:
      new_value = 0

    logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
    return (element[0], new_value - old_value)

为了测试这一点,我使用直接运行程序运行管道,并在单独的shell上发布了两条间隔超过10秒的消息(在我的情况下,WINDOW_SECONDS为10s):

To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):

gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"

作业输出显示预期的差异:

And the job output shows the expected difference:

INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40

我的示例的完整代码此处.在复制元素时要考虑到性能方面的考虑,但是如果需要在两个窗口中使用可用值,这是有道理的.

Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.

这篇关于数据流:在事件流中查找上一个事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆