Apache Beam:使用 Withtimestamp 分配事件时间时出错 [英] Apache Beam: Error assigning event time using Withtimestamp

查看:21
本文介绍了Apache Beam:使用 Withtimestamp 分配事件时间时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个无限制的 Kafka 流发送数据,其中包含以下字段

I have an unbounded Kafka stream sending data with the following fields

{"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}

我使用用于 kafka 的 apache beam sdk 读取流

I read the stream using the apache beam sdk for kafka

import org.apache.beam.sdk.io.kafka.KafkaIO;
pipeline.apply(KafkaIO.<Long, String>read()
                    .withBootstrapServers("kafka:9092")
                    .withTopic("test")
                    .withKeyDeserializer(LongDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) 
                    .updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))
                    .commitOffsetsInFinalize()
                    .withoutMetadata()))

由于我想使用事件时间(在我的示例中为ts")进行窗口化,因此我解析传入的字符串并将传入数据流的ts"字段分配为时间戳.

Since I want to window using event time ("ts" in my example), i parse the incoming string and assign "ts" field of the incoming datastream as the timestamp.

PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
                    .apply(ParDo.of(new ReadFromTopic()))
                    .apply("ParseTemperature", ParDo.of(new ParseTemperature()));

tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));  

窗函数和计算如下:

PCollection<Output> output = tempCollection.apply(Window
                .<Temperature>into(FixedWindows.of(Duration.standardSeconds(30)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
                .withAllowedLateness(Duration.standardDays(1))
                .accumulatingFiredPanes())
                .apply(new ComputeMax());

我将数据流式传输到输入流中,从当前 utc 时间延迟 5 秒,因为在实际场景中,事件时间戳通常早于处理时间戳.

I stream data into the input stream with a lag of 5 seconds from current utc time since in practical scenrios event timestamp is usually earlier than the processing timestamp.

我收到以下错误:

无法输出时间戳为 2019-01-16T11:15:45.560Z.输出时间戳不得早于当前输入的时间戳(2019-01-16T11:16:50.640Z) 减去允许的偏差(0 毫秒).有关更改的详细信息,请参阅 DoFn#getAllowedTimestampSkew() Javadoc允许的偏斜.

Cannot output with timestamp 2019-01-16T11:15:45.560Z. Output timestamps must be no earlier than the timestamp of the current input (2019-01-16T11:16:50.640Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

如果我将 AssignTimeStamps 的行注释掉,则没有错误,但我猜是在考虑处理时间.

If I comment out the line for AssignTimeStamps, there are no errors but I guess, then it is considering the processing time.

如何确保我的计算和窗口基于事件时间而不是处理时间?

How do I ensure my computation and windows are based on event time and not for processing time?

请提供一些关于如何处理这种情况的意见.

Please provide some inputs on how to handle this scenario.

推荐答案

要能够使用自定义时间戳,首先需要实现CustomTimestampPolicy,通过扩展TimestampPolicy

To be able to use custom timestamp, first You need to implement CustomTimestampPolicy, by extending TimestampPolicy<KeyT,ValueT>

例如:

public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {


protected Instant currentWatermark;

public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
    currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}


@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
    currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return currentWatermark;
}

@Override
public Instant getWatermark(PartitionContext ctx) {
    return currentWatermark;
}

}

然后,当您使用功能接口TimestampPolicyFactory

Then you need to pass your custom TimestampPolicy, when you setting up your KafkaIO source using functional interface TimestampPolicyFactory

KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
                .withTopic("foo")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
                .withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
                .updateConsumerProperties(kafkaProperties))

这一行负责创建一个新的timestampPolicy,传递一个相关的分区和之前的检查点水印见文档

This line is responsible for creating a new timestampPolicy, passing a related partition and previous checkpointed watermark see the documentation

withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))

这篇关于Apache Beam:使用 Withtimestamp 分配事件时间时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆