Apache Beam:RabbitMqIO 水印不前进 [英] Apache Beam : RabbitMqIO watermark doesn't advance

查看:21
本文介绍了Apache Beam:RabbitMqIO 水印不前进的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一些帮助.我正在尝试将 Apache beam 与 RabbitMqIO 源(版本 2.11.0)和 AfterWatermark.pastEndOfWindow 触发器一起使用.RabbitMqIO 的水印似乎没有前进并保持不变.由于此行为,AfterWatermark 触发器不起作用.当我使用其他不考虑水印的触发器时,它可以工作(例如:AfterProcessingTime、AfterPane) 下面是我的代码,谢谢:

I need some help please. I'm trying to use Apache beam with RabbitMqIO source (version 2.11.0) and AfterWatermark.pastEndOfWindow trigger. It seems like the RabbitMqIO's watermark doesn't advance and remain the same. Because of this behavior, the AfterWatermark trigger doesn't work. When I use others triggers which doesn't take watermark in consideration, that works (eg: AfterProcessingTime, AfterPane) Below, my code, thanks :

public class Main {

private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

// Window declaration with trigger
public static Window<RabbitMqMessage> window() {
    return Window. <RabbitMqMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes();
}

public static void main(String[] args) {
    SpringApplication.run(Main.class, args);

    // pipeline creation
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    // Using RabbitMqIO
    PCollection<RabbitMqMessage> messages = pipeline
            .apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:5672").withQueue("test"));

    PCollection<RabbitMqMessage> windowedData = messages.apply("Windowing", window());

    windowedData.apply(Combine.globally(new MyCombine()).withoutDefaults());

    pipeline.run();
    }

}

class MyCombine implements SerializableFunction<Iterable<RabbitMqMessage>,   RabbitMqMessage> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyCombineKafka.class);

/**
 * 
 */
private static final long serialVersionUID = 6143898367853230506L;

@Override
public RabbitMqMessage apply(Iterable<RabbitMqMessage> input) {
    LOGGER.info("After trigger launched");
    return null;
}

}

推荐答案

我花了很多时间研究这个问题.打开https://issues.apache.org/jira/browse/BEAM-8347 我离开了 一些注释 在票据中关于我认为当前实现的问题.

I spent a lot of time looking into this. After opening https://issues.apache.org/jira/browse/BEAM-8347 I left some notes in the ticket on what I think the problems are with the current implementation.

在此重申:

UnboundedSource.getWatermark 读取:

[水印] 可以是近似值.如果读取违反此保证的记录,它们将被视为迟到,这将影响如何他们将被处理....

[watermark] can be approximate. If records are read that violate this guarantee, they will be considered late, which will affect how they will be processed. ...

不过,这个值应该越晚越好.在此水印通过之前,下游窗口可能无法关闭结束.

However, this value should be as late as possible. Downstream windows may not be able to close until this watermark passes their end.

例如,源可能知道它读取的记录将按时间戳顺序排列.在这种情况下,水印可以是时间戳读取的最后一条记录.对于没有天然的来源时间戳,时间戳可以设置为读取的时间,其中如果水印是当前时钟时间.

For example, a source may know that the records it reads will be in timestamp order. In this case, the watermark can be the timestamp of the last record read. For a source that does not have natural timestamps, timestamps can be set to the time of reading, in which case the watermark is the current clock time.

UnboundedRabbitMqReader 使用最旧时间戳作为水印,违反了上述建议.

The implementation in UnboundedRabbitMqReader uses the oldest timestamp as the watermark, in violation of the above suggestion.

此外,应用的时间戳是交付时间,它应该是单调递增的.我们应该能够可靠地在每条传递的消息上增加水印,这基本上可以解决问题.

Further, the timestamp applied is delivery time, which should be monotonically increasing. We should reliably be able to increase the watermark on every message delivered, which mostly solves the issue.

最后,即使没有消息进来,我们也可以为增加水印做准备.如果没有新消息,按照所采取的方法推进水印应该没问题在 kafka io TimestampPolicyFactory 当流被赶上"时.在这种情况下,当我们没有看到新消息时,我们会将水印增加到例如 max(current watermark, NOW - 2 seconds),以确保窗口/触发器可以在不需要新数据的情况下触发.

Finally, we can make provisions for increasing the watermark even when no messages have come in. In the event where there are no new messages, it should be ok to advance the watermark following the approach taken in the kafka io TimestampPolicyFactory when the stream is 'caught up'. In this case, we would increment the watermark to, e.g., max(current watermark, NOW - 2 seconds) when we see no new messages, just to ensure windows/triggers can fire without requiring new data.

不幸的是,由于 Rabbit 实现对扩展是封闭的,并且大多是私有的或包私有的,因此很难在本地进行这些细微的修改.

Unfortunately, it's difficult to make these slight modifications locally as the Rabbit implementations are closed to extension, and are mostly private or package-private.

更新:我已经开启了上游 PR 来解决这个问题.此处更改:https://github.com/apache/beam/pull/9820

Update: I've opened a PR upstream to address this. Changes here: https://github.com/apache/beam/pull/9820

这篇关于Apache Beam:RabbitMqIO 水印不前进的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆