Kafka Streams:如何更改记录时间戳记(0.11.0)? [英] Kafka Streams: How to change a records timestamp (0.11.0)?

查看:121
本文介绍了Kafka Streams:如何更改记录时间戳记(0.11.0)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用FluentD(第12版的最新稳定版本)向Kafka发送消息.但是FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1.因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达kafka时的时间点.

I'm using FluentD (v.12 last stable version) to send messages to Kafka. But FluentD is using an old KafkaProducer, so that the records timestamp is always set to -1. Thus i have to use the WallclockTimestampExtractor to set the timestamp of the record to the point in time, when the message arrives in kafka.

我真正感兴趣的时间戳是通过消息中的fluentd发送的:

The timestamp i'm realy interested in, is send by fluentd within the message:

时间戳":"1507885936",主机":"V.X.Y.Z".

"timestamp":"1507885936","host":"V.X.Y.Z."

用kafka记录表示形式

record representation in kafka:

偏移量= 0,时间戳=-1,键=空,值= {时间戳":"1507885936",主机":"V.X.Y.Z"}

offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

我想在卡夫卡上有个这样的记录:

i would like to have a record like this in kafka:

偏移量= 0,时间戳= 1507885936,键=空,值= {时间戳":"1507885936",主机":"V.X.Y.Z."}

offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

我的解决方法如下:-编写使用者以提取时间戳记( https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html )

my workaround would look like: - write a consumer to extract the timestamp (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)

  • 写一个生产者以产生带有时间戳集(ProducerRecord(String主题,整数分区,长时间戳,K键,V值)的新记录

如果有的话,我希望使用KafkaStreams解决方案.

I would prefer a KafkaStreams solution, if there is one.

推荐答案

您可以编写一个非常简单的Kafka Streams应用程序,例如:

You can write a very simple Kafka Streams Application like:

KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");

并使用自定义的 TimestampExtractor 配置应用程序,该自定义代码将从记录中提取时间戳并返回.

and configure the application with a custom TimestampExtractor that extract the timestamp from the record and returns it.

当将记录写回Kafka时,Kafka Streams将使用返回的时间戳.

Kafka Streams will use the returned timestamps when writing the records back to Kafka.

注意:如果您有乱序的数据-即时间戳不是严格排序的-结果也将包含乱序的时间戳.Kafka Streams使用返回的时间戳写回Kafka(即,提取器返回的任何内容都用作记录元数据时间戳).请注意,在写时,当前处理的输入记录的时间戳将用于所有生成的输出记录-此保留版本1.0,但在将来的发行版中可能会更改.)

Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).

这篇关于Kafka Streams:如何更改记录时间戳记(0.11.0)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆