使用Kafka Streams在输出中设置时间戳 [英] Set timestamp in output with Kafka Streams

查看:289
本文介绍了使用Kafka Streams在输出中设置时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Kafka主题原始数据"中获取CSV,目的是通过发送带有正确时间戳(另一行不同)的另一主题数据"中的每一行来转换它们.

I'm getting CSVs in a Kafka topic "raw-data", the goal is to transform them by sending each line in another topic "data" with the right timestamp (different for each line).

目前,我有2条彩带:

  • 一个用于拆分原始数据"中的行,将其发送到内部"主题(无时间戳)
  • 具有TimestampExtractor的服务器,该服务器消耗内部"数据并将其发送到数据".
  • one to split the lines in "raw-data", sending them to an "internal" topic (no timestamp)
  • one with a TimestampExtractor that consumes "internal" and send them to "data".

我想通过直接设置时间戳来删除对内部"主题的使用,但是我找不到办法(时间戳提取器仅在消耗时间使用).

I'd like to remove the use of this "internal" topic by setting directly the timestamp but I couldn't find a way (the timestamp extractor are only used at consumption time).

我在文档中偶然发现了这一行:

I've stumbled upon this line in the documentation:

请注意,可以通过在调用#forward()时显式地将时间戳记分配给输出记录来在Processor API中更改描述的默认行为.

Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().

但是我找不到带有时间戳的签名.是什么意思?

but I couldn't find any signature with a timestamp. What do they mean?

你会怎么做?

明确地说,我有一个Kafka主题,其中包含一条消息,其中包含事件时间和一些值,例如:

To be clear, I have a Kafka topic with one message containing the event time and some value, such as:

2018-01-01,hello 2018-01-02,world (这是一条消息,而不是两条)

2018-01-01,hello 2018-01-02,world (this is ONE message, not two)

我想在另一个主题中获得两条消息,并将Kafka记录时间戳记设置为其事件时间(2018年1月1日和2018年1月2日),而无需中间主题.

I'd like to get two messages in another topic with the Kafka record timestamp set to their event time (2018-01-01 and 2018-01-02) without the need of an intermediate topic.

推荐答案

设置输出时间戳需要Kafka Streams 2.0,并且仅在Processor API中受支持.如果使用DSL,则可以使用transform()来使用这些API.

Setting the timestamp for the output requires Kafka Streams 2.0 and is only supported in Processor API. If you use the DSL, you can use transform() to use those APIs.

如您所指出的,您将使用context.forward().呼叫将是:

As you pointed out, you would use context.forward(). The call would be:

stream.transform(new TransformerSupplier() {
  public Transformer get() {
    return new Transformer() {
      // omit other methods for brevity
      // you need to get the `context` from `init()`

      public KeyValue transform(K key, V value) {
        // some business logic

        // you can call #forward() as often as you want
        context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));

        return null; // only return data via context#forward()
      }
    }
  }
});

这篇关于使用Kafka Streams在输出中设置时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆