使用 Kafka Streams 在输出中设置时间戳无法进行转换 [英] Set timestamp in output with Kafka Streams fails for transformations

查看:47
本文介绍了使用 Kafka Streams 在输出中设置时间戳无法进行转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们有一个转换器(用 Scala 编写)

Suppose we have a transformer (written in Scala)

new Transformer[String, V, (String, V)]() {
  var context: ProcessorContext = _

  override def init(context: ProcessorContext): Unit = {
    this.context = context
  }

  override def transform(key: String, value: V): (String, V) = {
    val timestamp = toTimestamp(value)
    context.forward(key, value, To.all().withTimestamp(timestamp))
    key -> value
  }

  override def close(): Unit = ()
}

其中 toTimestamp 只是一个函数,它返回从记录值中获取的时间戳.一旦它被执行,就会有一个 NPE:

where toTimestamp is just a function which returns an a timestamp fetched from the record value. Once it gets executed, there's an NPE:

Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
    at CustomTransformer.transform()
    at CustomTransformer.transform()
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
    at 

本质上发生的是 ProcessorContextImpl 失败:

what essentially happens is that ProcessorContextImpl fails in:

public <K, V> void forward(final K key, final V value, final To to) {
    toInternal.update(to);
    if (toInternal.hasTimestamp()) {
        recordContext.setTimestamp(toInternal.timestamp());
    }
    final ProcessorNode previousNode = currentNode();

因为 recordContext 没有初始化(它只能由 KafkaStreams 在内部完成).

because the recordContext was not initialized (an it could only be done internally by KafkaStreams).

这是一个后续问题使用 Kafka Streams 1 在输出中设置时间戳

推荐答案

如果您使用 transformer,您需要确保在创建新的 Transformer 对象时TransformerSupplier#get() 被调用.(参见https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)

If you work with transformer, you need to make sure that a new Transformer object is create when TransformerSupplier#get() is called. (cf. https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)

在最初的问题中,我认为它与导致 NPE 的 context 变量有关,但现在我意识到它与 Kafka Streams 内部结构有关.

In the original question, I thought it's about your context variable that results in NPE, but now I realized it's about the Kafka Streams internals.

Scala API 在 2.0.0 中存在一个错误,可能会导致重复使用相同的 Transformer 实例(https://issues.apache.org/jira/browse/KAFKA-7250).我认为您遇到了这个错误.稍微重写您的代码应该可以解决问题.请注意,Kafka 2.0.1 和 Kafka 2.1.0 包含修复程序.

The Scala API has a bug in 2.0.0 that may result in the case that the same Transformer instance is reused (https://issues.apache.org/jira/browse/KAFKA-7250). I think that you are hitting this bug. Rewriting your code a little bit should fix the issues. Note, that Kafka 2.0.1 and Kafka 2.1.0 contain a fix.

这篇关于使用 Kafka Streams 在输出中设置时间戳无法进行转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆