从控制台产生具有空值(墓碑)的Kafka消息 [英] Producing a Kafka message with a Null Value (Tombstone) from the Console

查看:103
本文介绍了从控制台产生具有空值(墓碑)的Kafka消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有什么方法可以在kafka-console-producer中产生一个带有空值的消息(即,将其标记为供压缩程序使用逻辑删除的消息)?

Is there any way to produce a message in the kafka-console-producer with a null value (ie. mark it for the compactor to delete it with a tombstone)?

我尝试生成"mykey"和"mykey |".前者产生错误,而后者使值成为空字符串.像这样运行生产者:

I've tried producing "mykey" and "mykey|". The former produces an error and the later makes the value the empty string. Running producer like this:

$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"

推荐答案

不幸的是,无法使用console-producer来做到这一点

Unfortunately, there is no way to do that using console-producer

这是ConsoleProducer类的代码片段(它如何读取数据). Kafka 0.11.0(不要认为它在不同版本之间进行了重大更改).

this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).

override def readMessage() = {
  lineNumber += 1
  print(">")
  (reader.readLine(), parseKey) match {
    case (null, _) => null
    case (line, true) =>
      line.indexOf(keySeparator) match {
        case -1 =>
          if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
          else throw new KafkaException(s"No key found on line $lineNumber: $line")
        case n =>
          val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
          new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
      }
    case (line, false) =>
      new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
  }
}

如您所见,该值始终是一个不可为空的字节数组

as you can see, the value is always an non-nullable array of bytes

这篇关于从控制台产生具有空值(墓碑)的Kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆