使用 Kafka Streams 在输出中设置时间戳 [英] Set timestamp in output with Kafka Streams

查看:23
本文介绍了使用 Kafka Streams 在输出中设置时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Kafka 主题原始数据"中获取 CSV,目标是通过使用正确的时间戳(每行不同)发送另一个主题数据"中的每一行来转换它们.

I'm getting CSVs in a Kafka topic "raw-data", the goal is to transform them by sending each line in another topic "data" with the right timestamp (different for each line).

目前,我有 2 个主播:

Currently, I have 2 streamers:

  • 将原始数据"中的行拆分,将它们发送到内部"主题(无时间戳)
  • 带有 TimestampExtractor 的一个,它使用内部"并将它们发送到数据".
  • one to split the lines in "raw-data", sending them to an "internal" topic (no timestamp)
  • one with a TimestampExtractor that consumes "internal" and send them to "data".

我想通过直接设置时间戳来删除这个内部"主题的使用,但我找不到方法(时间戳提取器仅在消费时使用).

I'd like to remove the use of this "internal" topic by setting directly the timestamp but I couldn't find a way (the timestamp extractor are only used at consumption time).

我在文档中偶然发现了这一行:

I've stumbled upon this line in the documentation:

请注意,通过在调用 #forward() 时显式地为输出记录分配时间戳,可以在处理器 API 中更改描述默认行为.

Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().

但我找不到任何带有时间戳的签名.它们是什么意思?

but I couldn't find any signature with a timestamp. What do they mean?

你会怎么做?

明确地说,我有一个 Kafka 主题,其中一条消息包含事件时间和一些值,例如:

To be clear, I have a Kafka topic with one message containing the event time and some value, such as:

<代码>2018-01-01,你好2018-01-02,世界(这是一条消息,不是两条)

2018-01-01,hello 2018-01-02,world (this is ONE message, not two)

我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为其事件时间(2018-01-01 和 2018-01-02),而无需中间主题.

I'd like to get two messages in another topic with the Kafka record timestamp set to their event time (2018-01-01 and 2018-01-02) without the need of an intermediate topic.

推荐答案

为输出设置时间戳需要 Kafka Streams 2.0,并且仅在 Processor API 中受支持.如果您使用 DSL,则可以使用 transform() 来使用这些 API.

Setting the timestamp for the output requires Kafka Streams 2.0 and is only supported in Processor API. If you use the DSL, you can use transform() to use those APIs.

正如您所指出的,您将使用 context.forward().调用将是:

As you pointed out, you would use context.forward(). The call would be:

stream.transform(new TransformerSupplier() {
  public Transformer get() {
    return new Transformer() {
      // omit other methods for brevity
      // you need to get the `context` from `init()`

      public KeyValue transform(K key, V value) {
        // some business logic

        // you can call #forward() as often as you want
        context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));

        return null; // only return data via context#forward()
      }
    }
  }
});

这篇关于使用 Kafka Streams 在输出中设置时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆