Windows Python Apache Beam重复数据 [英] SlidingWindows Python Apache Beam duplicate the data
问题描述
问题
每次系统从带有滑动Windows的pubsub收到消息时,该消息都是重复的
Each time the system receive a message from pubsub with a Sliding Windows it been duplicated
代码
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
输出
如果我仅从pub/sub发送一条消息,并尝试在滑动窗口完成后使用以下代码打印我的内容:
If I only send one message from pub/sub and try to print what I have after the sliding window finish with the code:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))
结果
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))
之前打印消息,我只会收到一次
If I print the message before 'window' >> beam.WindowInto(window.SlidingWindows(30, 15))
I only get once
该过程处于图形模式":
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...
在滑动窗口开始时,消息X仅发送了一次,应该只接收一次,但是接收两次.
The message X was sent only once at the begining of the slidingwindow, it should only be received once, but is been receiving twice
我尝试使用两个AccumulationMode值,也使用trigger = AftyerWatermark,但无法解决问题.
I have tried with both AccumulationMode values, also with a trigger=AftyerWatermark but i can not fix the problem.
怎么了?
额外
在FixedWindows中,这是我的海豚的正确代码:
With FixedWindows this is the correct code for my porpouse:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())
或
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
推荐答案
我有完全相同的问题,但是在Java中.我有一个持续时间为10秒,步长为3秒的窗口.当我订阅的mqtt主题发出事件时,它看起来就像我已经运行的ParDo函数,并向所有三个构造的"事件发出第一个也是唯一的事件.窗口.
I have exactly the same issue, however in java. I have a window with 10 seconds duration and a step of 3 seconds. When an event is emitted from the mqtt topic, that I subscribe to, it looks like the ParDo function that I have runs and emits the first and only event to all of the three "constructed" windows.
X是我在随机时间戳发送的事件:2020-09-15T21:17:57.292Z
X is the event that i send at a random timestamp: 2020-09-15T21:17:57.292Z
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |X============| : :
w2: |X=============| :
w3: |X==============|
...
甚至为它们分配了相同的时间戳!!我真的必须做些完全错误的事情.
Even the same timestamp is assigned to them!! I must really doing something completely wrong.
我将Scala 2.12和BEAM 2.23与Direct Runner一起使用.
I use Scala 2.12 and BEAM 2.23 with a Direct Runner.
[提示]: 我在processElement函数中使用状态!每个键+窗口保持状态的位置.也许那里有一个bug?我将尝试在没有状态的情况下进行测试.
[Hint]: I use states in the processElement function! Where the state is being hold per key + window. Maybe there is a bug there? I will try to test it without states.
更新:删除了状态字段,并将单个事件分配给一个窗口.
UPDATE: Removed the state fields and the single event is assigned to one window.
这篇关于Windows Python Apache Beam重复数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!