Windows Python Apache Beam重复数据 [英] SlidingWindows Python Apache Beam duplicate the data

查看:102
本文介绍了Windows Python Apache Beam重复数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

每次系统从带有滑动Windows的pubsub收到消息时,该消息都是重复的

Each time the system receive a message from pubsub with a Sliding Windows it been duplicated

代码

 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    
 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())


输出

如果我仅从pub/sub发送一条消息,并尝试在滑动窗口完成后使用以下代码打印我的内容:

If I only send one message from pub/sub and try to print what I have after the sliding window finish with the code:

class print_row2(beam.DoFn):
    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

结果

('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))之前打印消息,我只会收到一次

If I print the message before 'window' >> beam.WindowInto(window.SlidingWindows(30, 15)) I only get once

该过程处于图形模式":

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |=X===========|       :       :
  w2:               |==============|       :
  ...

在滑动窗口开始时,消息X仅发送了一次,应该只接收一次,但是接收两次.

The message X was sent only once at the begining of the slidingwindow, it should only be received once, but is been receiving twice

我尝试使用两个AccumulationMode值,也使用trigger = AftyerWatermark,但无法解决问题.

I have tried with both AccumulationMode values, also with a trigger=AftyerWatermark but i can not fix the problem.

怎么了?

额外

在FixedWindows中,这是我的海豚的正确代码:

With FixedWindows this is the correct code for my porpouse:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

推荐答案

我有完全相同的问题,但是在Java中.我有一个持续时间为10秒,步长为3秒的窗口.当我订阅的mqtt主题发出事件时,它看起来就像我已经运行的ParDo函数,并向所有三个构造的"事件发出第一个也是唯一的事件.窗口.

I have exactly the same issue, however in java. I have a window with 10 seconds duration and a step of 3 seconds. When an event is emitted from the mqtt topic, that I subscribe to, it looks like the ParDo function that I have runs and emits the first and only event to all of the three "constructed" windows.

X是我在随机时间戳发送的事件:2020-09-15T21:17:57.292Z

X is the event that i send at a random timestamp: 2020-09-15T21:17:57.292Z

  time: ----t+00---t+15---t+30----t+45----t+60------>
             :      :      :       :       :
  w1:        |X============|       :       :
  w2:               |X=============|       :
  w3:                      |X==============|
  ...

甚至为它们分配了相同的时间戳!!我真的必须做些完全错误的事情.

Even the same timestamp is assigned to them!! I must really doing something completely wrong.

我将Scala 2.12和BEAM 2.23与Direct Runner一起使用.

I use Scala 2.12 and BEAM 2.23 with a Direct Runner.

[提示]: 我在processElement函数中使用状态!每个键+窗口保持状态的位置.也许那里有一个bug?我将尝试在没有状态的情况下进行测试.

[Hint]: I use states in the processElement function! Where the state is being hold per key + window. Maybe there is a bug there? I will try to test it without states.

更新:删除了状态字段,并将单个事件分配给一个窗口.

UPDATE: Removed the state fields and the single event is assigned to one window.

这篇关于Windows Python Apache Beam重复数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆