kafka flink时间戳事件时间和水印 [英] kafka flink timestamp Event time and watermark
问题描述
我正在阅读《使用Apache Flink进行流处理》一书,并指出从0.10.0版本开始,Kafka支持消息时间戳.从Kafka 0.10或更高版本读取时,如果应用程序以事件时间模式运行,那么使用者将自动将消息时间戳提取为事件时间时间戳*"因此,在 processElement
函数内部,调用 context.timestamp()
默认情况下会返回kafka消息时间戳吗?请为您提供一个简单的示例,说明如何实现AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks,以根据消耗的kafka消息时间戳提取(并构建水印).
如果我使用的是 TimeCharacteristic.ProcessingTime
,它将ctx.timestamp()返回处理时间,在这种情况下,它将类似于 context.timerService().currentProcessingTime()
.
谢谢.
Flink Kafka使用者会为您解决此问题,并将时间戳记放在需要的地方.在Flink 1.11中,您仍然可以简单地依靠它,尽管您仍然需要提供指定乱序(或断言时间戳是有序的)的WatermarkStrategy:
FlinkKafkaConsumer< String>myConsumer =新的FlinkKafkaConsumer<>(...);myConsumer.assignTimestampsAndWatermarks(水印策略..forBoundedOutOfOrderness(Duration.ofSeconds(20)));
在早期版本的Flink中,您必须提供时间戳分配器的实现,如下所示:
public long extractTimestamp(Long element,long previousElementTimestamp){返回previousElementTimestamp;}
此版本的 extractTimestamp
方法以 previousElementTimestamp
的形式传递到StreamRecord中的时间戳的当前值,在这种情况下,它将是Flink Kafka消费者.
链接1.11文档
processElement function the call context.timestamp()
will by default return the kafka message timestamp?
Coul you please provide a simple example on how to implement AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks that extract (and builds watermarks) based on the consumed kafka message timestamp.
If I am using TimeCharacteristic.ProcessingTime
, would ctx.timestamp() return the processing time and in such case would it be similar to context.timerService().currentProcessingTime()
.
Thank you.
The Flink Kafka consumer takes care of this for you, and puts the timestamp where it needs to be. In Flink 1.11 you can simply rely on this, though you still need to take care of providing a WatermarkStrategy that specifies the out-of-orderness (or asserts that the timestamps are in order):
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
In earlier versions of Flink you had to provide an implementation of a timestamp assigner, which would look like this:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
This version of the extractTimestamp
method is passed the current value of the timestamp present in the StreamRecord as previousElementTimestamp
, which in this case will be the timestamp put there by the Flink Kafka consumer.
Flink 1.11 docs
Flink 1.10 docs
As for what is returned by ctx.timestamp()
when using TimeCharacteristic.ProcessingTime
, this method returns NULL in that case. (Semantically, yes, it is as though the timestamp is the current processing time, but that's not how it's implemented.)
这篇关于kafka flink时间戳事件时间和水印的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!