Kafka Streams:如何更改记录时间戳(0.11.0)? [英] Kafka Streams: How to change a records timestamp (0.11.0)?

查看:36
本文介绍了Kafka Streams:如何更改记录时间戳(0.11.0)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息.但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1.因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点.

I'm using FluentD (v.12 last stable version) to send messages to Kafka. But FluentD is using an old KafkaProducer, so that the records timestamp is always set to -1. Thus i have to use the WallclockTimestampExtractor to set the timestamp of the record to the point in time, when the message arrives in kafka.

我真正感兴趣的时间戳是由 fluentd 在消息中发送的:

The timestamp i'm realy interested in, is send by fluentd within the message:

"timestamp":"1507885936","host":"V.X.Y.Z."

"timestamp":"1507885936","host":"V.X.Y.Z."

kafka 中的记录表示:

record representation in kafka:

offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

我想在 kafka 中有这样的记录:

i would like to have a record like this in kafka:

offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

我的解决方法如下:- 编写一个消费者来提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)

my workaround would look like: - write a consumer to extract the timestamp (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)

  • 写一个生产者,用时间戳集(ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value))产生一条新记录

如果有的话,我更喜欢 KafkaStreams 解决方案.

I would prefer a KafkaStreams solution, if there is one.

推荐答案

您可以编写一个非常简单的 Kafka Streams 应用程序,例如:

You can write a very simple Kafka Streams Application like:

KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");

并使用自定义 TimestampExtractor 配置应用程序,从记录中提取时间戳并将其返回.

and configure the application with a custom TimestampExtractor that extract the timestamp from the record and returns it.

Kafka Streams 将在将记录写回 Kafka 时使用返回的时间戳.

Kafka Streams will use the returned timestamps when writing the records back to Kafka.

注意:如果您有乱序数据——即时间戳不是严格排序的——结果也将包含乱序时间戳.Kafka Streams 使用返回的时间戳写回 Kafka(即,无论提取器返回什么,都用作记录元数据时间戳).请注意,在写入时,当前处理的输入记录的时间戳将用于所有生成的输出记录——这适用于 1.0 版,但在未来版本中可能会发生变化.

Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).

这篇关于Kafka Streams:如何更改记录时间戳(0.11.0)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆