Apache Beam:RabbitMqIO水印未增强 [英] Apache Beam : RabbitMqIO watermark doesn't advance

查看:162
本文介绍了Apache Beam:RabbitMqIO水印未增强的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请给我一些帮助.我正在尝试将Apache Beam与RabbitMqIO源(版本2.11.0)和AfterWatermark.pastEndOfWindow触发器一起使用.看来RabbitMqIO的水印不会增加,而是保持不变.由于这种行为,AfterWatermark触发器不起作用.当我使用其他不考虑水印的触发器时,它可以工作(例如:AfterProcessingTime,AfterPane),在下面,我的代码,谢谢:

I need some help please. I'm trying to use Apache beam with RabbitMqIO source (version 2.11.0) and AfterWatermark.pastEndOfWindow trigger. It seems like the RabbitMqIO's watermark doesn't advance and remain the same. Because of this behavior, the AfterWatermark trigger doesn't work. When I use others triggers which doesn't take watermark in consideration, that works (eg: AfterProcessingTime, AfterPane) Below, my code, thanks :

public class Main {

private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

// Window declaration with trigger
public static Window<RabbitMqMessage> window() {
    return Window. <RabbitMqMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes();
}

public static void main(String[] args) {
    SpringApplication.run(Main.class, args);

    // pipeline creation
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    // Using RabbitMqIO
    PCollection<RabbitMqMessage> messages = pipeline
            .apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:5672").withQueue("test"));

    PCollection<RabbitMqMessage> windowedData = messages.apply("Windowing", window());

    windowedData.apply(Combine.globally(new MyCombine()).withoutDefaults());

    pipeline.run();
    }

}

class MyCombine implements SerializableFunction<Iterable<RabbitMqMessage>,   RabbitMqMessage> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyCombineKafka.class);

/**
 * 
 */
private static final long serialVersionUID = 6143898367853230506L;

@Override
public RabbitMqMessage apply(Iterable<RabbitMqMessage> input) {
    LOGGER.info("After trigger launched");
    return null;
}

}

推荐答案

我花了很多时间研究这个问题.打开 https://issues.apache.org/jira/browse/BEAM-8347 我离开了

I spent a lot of time looking into this. After opening https://issues.apache.org/jira/browse/BEAM-8347 I left some notes in the ticket on what I think the problems are with the current implementation.

在此处重述:

[水印]可能是近似值.如果读取的记录违反了此保证,则将其视为较晚的记录,这将影响如何 他们将被处理. ...

[watermark] can be approximate. If records are read that violate this guarantee, they will be considered late, which will affect how they will be processed. ...

但是,该值应尽可能晚.在此水印通过后,下游窗口可能无法关闭 结束.

However, this value should be as late as possible. Downstream windows may not be able to close until this watermark passes their end.

例如,源可能知道其读取的记录将按时间戳顺序.在这种情况下,水印可以是时间戳记 读取的最后一条记录.对于不自然的来源 时间戳记,可以将时间戳记设置为阅读时间,其中 如果水印是当前时钟时间.

For example, a source may know that the records it reads will be in timestamp order. In this case, the watermark can be the timestamp of the last record read. For a source that does not have natural timestamps, timestamps can be set to the time of reading, in which case the watermark is the current clock time.

The implementation in UnboundedRabbitMqReader uses the oldest timestamp as the watermark, in violation of the above suggestion.

此外,应用的时间戳是传递时间,该时间应单调增加.我们应该可靠地能够在传递的每条消息上增加水印,这基本上可以解决问题.

Further, the timestamp applied is delivery time, which should be monotonically increasing. We should reliably be able to increase the watermark on every message delivered, which mostly solves the issue.

最后,即使没有 消息出现,我们也可以增加水印.如果没有新消息,则应该遵循所采用的方法来推进水印在kafka io max(current watermark, NOW - 2 seconds),只是为了确保Windows/触发器可以触发而无需新数据.

Finally, we can make provisions for increasing the watermark even when no messages have come in. In the event where there are no new messages, it should be ok to advance the watermark following the approach taken in the kafka io TimestampPolicyFactory when the stream is 'caught up'. In this case, we would increment the watermark to, e.g., max(current watermark, NOW - 2 seconds) when we see no new messages, just to ensure windows/triggers can fire without requiring new data.

不幸的是,由于Rabbit实现不支持扩展,并且大多是私有或包私有的,因此很难在本地进行这些细微修改.

Unfortunately, it's difficult to make these slight modifications locally as the Rabbit implementations are closed to extension, and are mostly private or package-private.

更新:我已经打开上游PR来解决此问题.此处的更改: https://github.com/apache/beam/pull/9820

Update: I've opened a PR upstream to address this. Changes here: https://github.com/apache/beam/pull/9820

这篇关于Apache Beam:RabbitMqIO水印未增强的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆