Apache Beam:使用Withtimestamp分配事件时间时出错 [英] Apache Beam: Error assigning event time using Withtimestamp
问题描述
我有一个无限的Kafka流,它使用以下字段发送数据
I have an unbounded Kafka stream sending data with the following fields
{"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}
我使用用于kafka的apache光束sdk读取流
I read the stream using the apache beam sdk for kafka
import org.apache.beam.sdk.io.kafka.KafkaIO;
pipeline.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("kafka:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))
.commitOffsetsInFinalize()
.withoutMetadata()))
由于我想使用事件时间(在我的示例中为"ts")进行窗口显示,因此我解析了输入字符串,并将输入数据流的"ts"字段分配为时间戳.
Since I want to window using event time ("ts" in my example), i parse the incoming string and assign "ts" field of the incoming datastream as the timestamp.
PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
.apply(ParDo.of(new ReadFromTopic()))
.apply("ParseTemperature", ParDo.of(new ParseTemperature()));
tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));
窗口函数和计算方法如下:
The window function and the computation is applied as below:
PCollection<Output> output = tempCollection.apply(Window
.<Temperature>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardDays(1))
.accumulatingFiredPanes())
.apply(new ComputeMax());
我将数据流输入输入流的时间比当前utc时间晚5秒钟,因为在实际的场景事件中,时间戳通常早于处理时间戳.
I stream data into the input stream with a lag of 5 seconds from current utc time since in practical scenrios event timestamp is usually earlier than the processing timestamp.
我收到以下错误:
无法输出时间戳为2019-01-16T11:15:45.560Z的时间戳.输出 时间戳记不得早于当前输入的时间戳记 (2019-01-16T11:16:50.640Z)减去允许的偏斜(0毫秒). 有关更改的详细信息,请参见DoFn#getAllowedTimestampSkew()Javadoc 允许的偏斜.
Cannot output with timestamp 2019-01-16T11:15:45.560Z. Output timestamps must be no earlier than the timestamp of the current input (2019-01-16T11:16:50.640Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
如果我将 AssignTimeStamps 的行注释掉,则没有错误,但我想这是在考虑处理时间.
If I comment out the line for AssignTimeStamps, there are no errors but I guess, then it is considering the processing time.
如何确保我的计算和窗口基于事件时间而不是处理时间?
How do I ensure my computation and windows are based on event time and not for processing time?
请提供一些有关如何处理这种情况的信息.
Please provide some inputs on how to handle this scenario.
推荐答案
首先要能够使用自定义时间戳,您需要通过扩展TimestampPolicy<KeyT,ValueT>
To be able to use custom timestamp, first You need to implement CustomTimestampPolicy, by extending TimestampPolicy<KeyT,ValueT>
例如:
public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {
protected Instant currentWatermark;
public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}
然后,当使用功能接口TimestampPolicyFactory
Then you need to pass your custom TimestampPolicy, when you setting up your KafkaIO source using functional interface TimestampPolicyFactory
KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
.withTopic("foo")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
.withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
.updateConsumerProperties(kafkaProperties))
此行负责创建新的timestampPolicy,传递相关分区和先前的检查点水印,请参见
This line is responsible for creating a new timestampPolicy, passing a related partition and previous checkpointed watermark see the documentation
withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
这篇关于Apache Beam:使用Withtimestamp分配事件时间时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!