SlidingWindows Python Apache Beam 复制数据 [英] SlidingWindows Python Apache Beam duplicate the data

查看:30
本文介绍了SlidingWindows Python Apache Beam 复制数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

每次系统从带有滑动窗口的 pubsub 收到消息时,它都会被复制

Each time the system receive a message from pubsub with a Sliding Windows it been duplicated

代码

 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

<小时>

输出

如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后打印我所拥有的代码:

If I only send one message from pub/sub and try to print what I have after the sliding window finish with the code:

class print_row2(beam.DoFn):
    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

结果

('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

如果我在 'window' >> 之前打印消息beam.WindowInto(window.SlidingWindows(30, 15)) 我只得到一次

If I print the message before 'window' >> beam.WindowInto(window.SlidingWindows(30, 15)) I only get once

过程在图形模式":

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

消息X在slidingwindow开始时只发送了一次,应该只接收一次,但是已经接收了两次

The message X was sent only once at the begining of the slidingwindow, it should only be received once, but is been receiving twice

我已尝试使用两个 AccumulationMode 值,也使用 trigger=AftyerWatermark 但我无法解决问题.

I have tried with both AccumulationMode values, also with a trigger=AftyerWatermark but i can not fix the problem.

可能有什么问题?

额外

使用 FixedWindows 这是我的海豚的正确代码:

With FixedWindows this is the correct code for my porpouse:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

推荐答案

我有完全相同的问题,但是在 java 中.我有一个持续时间为 10 秒和步长为 3 秒的窗口.当从我订阅的 mqtt 主题发出事件时,它看起来像我运行的 ParDo 函数,并将第一个也是唯一一个事件发送到所有三个构造的"事件.窗户.

I have exactly the same issue, however in java. I have a window with 10 seconds duration and a step of 3 seconds. When an event is emitted from the mqtt topic, that I subscribe to, it looks like the ParDo function that I have runs and emits the first and only event to all of the three "constructed" windows.

X 是我在随机时间戳发送的事件:2020-09-15T21:17:57.292Z

X is the event that i send at a random timestamp: 2020-09-15T21:17:57.292Z

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |X============|       :       :
  w2:               |X=============|       :
  w3:                      |X==============|
  ...

甚至为它们分配了相同的时间戳!!我一定真的做错了什么.

Even the same timestamp is assigned to them!! I must really doing something completely wrong.

我使用带有 Direct Runner 的 Scala 2.12 和 BEAM 2.23.

I use Scala 2.12 and BEAM 2.23 with a Direct Runner.

[提示]: 我在 processElement 函数中使用状态!每个键 + 窗口保持状态的位置.也许那里有错误?我将尝试在没有状态的情况下对其进行测试.

[Hint]: I use states in the processElement function! Where the state is being hold per key + window. Maybe there is a bug there? I will try to test it without states.

更新:删除了状态字段,并将单个事件分配给一个窗口.

UPDATE: Removed the state fields and the single event is assigned to one window.

这篇关于SlidingWindows Python Apache Beam 复制数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆