kafka flink 时间戳事件时间和水印 [英] kafka flink timestamp Event time and watermark

查看:107
本文介绍了kafka flink 时间戳事件时间和水印的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读《使用 Apache Flink 进行流处理》一书,其中指出从 0.10.0 版开始,Kafka 支持消息时间戳.从 Kafka 0.10 或更高版本读取时,如果应用程序在事件时间模式下运行,消费者将自动提取消息时间戳作为事件时间时间戳*"那么在 processElement 函数中,调用 context.timestamp() 将默认返回 kafka 消息时间戳?您能否提供一个简单的示例,说明如何实现根据使用的 kafka 消息时间戳提取(并构建水印)的 AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks.

I am reading the book Stream Processing with Apache Flink and it is stated that "As of version 0.10.0, Kafka supports message timestamps. When reading from Kafka version 0.10 or later, the consumer will automatically extract the message timestamp as an event-time timestamp if the application runs in event-time mode*" So inside a processElement function the call context.timestamp() will by default return the kafka message timestamp? Coul you please provide a simple example on how to implement AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks that extract (and builds watermarks) based on the consumed kafka message timestamp.

如果我使用 TimeCharacteristic.ProcessingTime,ctx.timestamp() 会返回处理时间,在这种情况下它会类似于 context.timerService().currentProcessingTime() .

If I am using TimeCharacteristic.ProcessingTime, would ctx.timestamp() return the processing time and in such case would it be similar to context.timerService().currentProcessingTime() .

谢谢.

推荐答案

Flink Kafka 消费者会为你处理这个问题,并将时间戳放在需要的地方.在 Flink 1.11 中,您可以简单地依赖它,尽管您仍然需要提供一个 WatermarkStrategy 来指定乱序(或断言时间戳是有序的):

The Flink Kafka consumer takes care of this for you, and puts the timestamp where it needs to be. In Flink 1.11 you can simply rely on this, though you still need to take care of providing a WatermarkStrategy that specifies the out-of-orderness (or asserts that the timestamps are in order):

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
    WatermarkStrategy.
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

在早期版本的 Flink 中,您必须提供时间戳分配器的实现,如下所示:

In earlier versions of Flink you had to provide an implementation of a timestamp assigner, which would look like this:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

此版本的 extractTimestamp 方法以 previousElementTimestamp 形式传递存在于 StreamRecord 中的时间戳的当前值,在这种情况下,它将是由Flink Kafka 消费者.

This version of the extractTimestamp method is passed the current value of the timestamp present in the StreamRecord as previousElementTimestamp, which in this case will be the timestamp put there by the Flink Kafka consumer.

Flink 1.11 文档
Flink 1.10 文档

至于ctx.timestamp()在使用TimeCharacteristic.ProcessingTime时返回什么,在这种情况下该方法返回NULL.(从语义上来说,是的,就好像时间戳是当前处理时间一样,但这不是它的实现方式.)

As for what is returned by ctx.timestamp() when using TimeCharacteristic.ProcessingTime, this method returns NULL in that case. (Semantically, yes, it is as though the timestamp is the current processing time, but that's not how it's implemented.)

这篇关于kafka flink 时间戳事件时间和水印的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆