如何更改记录的时间戳? [英] How to change timestamp of records?

查看:60
本文介绍了如何更改记录的时间戳?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。

是否有特定于Kafka Streams的解决方案?


我真正感兴趣的时间戳是由fluentd在消息中发送的:

";timestamp";:";1507885936";,&Quot;主机:&Quot;V.X.Y.Z.

以卡夫卡表示的记录:

偏移量=0,时间戳=-1,键=空,值={";timestamp";:";1507885936";,;主机;:V.X.Y.Z.&Quot;}

我希望有这样一张卡夫卡唱片:

OFFSET=0,TIMESTAMP=1507885936,KEY=NULL,VALUE={";timestamp";:";1507885936";,;HOST&QOT;:&QOT;V.X.Y.Z.&QOT;}

我的解决方法如下所示:

我更喜欢KafkaStreams解决方案(如果有)。

推荐答案

您可以编写非常简单的Kafka Streams应用程序,如下所示:

KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");

并使用从记录中提取时间戳并返回时间戳的自定义TimestampExtractor配置应用程序。

Kafka Streams在将记录写回Kafka时将使用返回的时间戳。

注意:如果您有乱序数据--即时间戳没有严格排序--结果也将包含乱序时间戳。Kafka Streams使用返回的时间戳回写Kafka(即,无论提取程序返回什么,都用作记录元数据时间戳)。请注意,在写入时,当前处理的输入记录中的时间戳用于所有生成的输出记录--这适用于版本1.0,但在将来的版本中可能会更改。)。

更新:

一般来说,您可以通过处理器API修改时间戳。调用context.forward()可以通过To.all().withTimestamp(...)将输出记录时间戳设置为forward()的参数。

这篇关于如何更改记录的时间戳?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆