从 Kafka 主题反序列化 FlinkKafkaProducer 的事件会导致空的 JSON 记录 [英] Deserializing FlinkKafkaProducer's events from Kafka topic results in empty JSON records
问题描述
编辑我准备了最小的可重现示例 https://github.com/kazuhirokomoda/flink-kafka-druid一个>
EDIT I prepared minimal reproducible example https://github.com/kazuhirokomoda/flink-kafka-druid
我有一个数据管道和代码,与我之前的问题中提到的完全相同(除非在本文底部另有说明):我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的? 基本上我在自定义 ObjectMapper
>KafkaSerializationSchema(即ExampleDataSerializationSchema
)来序列化事件.
I have a data pipeline and code, exactly the same (unless otherwise noted at the bottom of this post) as mentioned in my previous question: In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]? Basically I am using Jackson's ObjectMapper
within custom KafkaSerializationSchema
(i.e. ExampleDataSerializationSchema
) to serialize events.
出于测试目的,我在裸机服务器上准备了 Kafka 2.6,遵循 https://kafka.apache.组织/快速入门
For testing purpose, I prepared Kafka 2.6 on a bare metal server, following https://kafka.apache.org/quickstart
我使用 kafka-console-consumer.sh
检查从 Flink 作业发送到 Kafka 主题的内容,但我只看到空的 JSON 记录,而我期待非空记录.>
I used kafka-console-consumer.sh
to inspect what are sent from Flink job to Kafka topic, but I only saw empty JSON records, while I was expecting non-empty records.
bin/kafka-console-consumer.sh --topic <topic> --from-beginning --bootstrap-server localhost:9092
...
{}
{}
{}
^CProcessed a total of 287 messages
注意
- 毫不奇怪,按照 The Kafka quickstart 从第 1 步到第 5 步:阅读事件(iestring 输入/输出与 Kafka 控制台生产者/消费者)工作正常.
- 从 Flink 作业发送(固定)字符串(而不是 Jackson/JSON)也工作正常.(见下面的代码)
- Not surprisingly, following The Kafka quickstart from STEP 1 up to STEP 5: READ THE EVENTS (i.e.string input/output with Kafka console producer/consumer) worked fine.
- Sending (fixed) string (instead of Jackson/JSON) from Flink job also worked fine. (See code below)
Note
bin/kafka-console-consumer.sh --topic <topic> --from-beginning --bootstrap-server localhost:9092
...
fixed string to send to kafka
fixed string to send to kafka
fixed string to send to kafka
^CProcessed a total of 287 messages
问题
- 使用自定义序列化程序时(即 Flink 的
ExampleDataSerializationSchema
如我上一个问题所示我应该采用哪种二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>]?) 发送的,我是否还需要定义自定义 de 序列化器?- 如果是,我该如何实现它以及我可以将它放在哪里,以便 Kafka 控制台使用者(以及最终 Druid,这是我上一个问题的目标)可以使用它?
- When using custom serializer (i.e. Flink's
ExampleDataSerializationSchema
as shown in my previous question In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?), do I need to define custom deserializer as well ?- If yes, how can I implement it and where can I place it so Kafka console consumer (and eventually Druid, which is the goal of my previous question) can use it?
再次感谢您的帮助.
object KafkaSink { def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = { // defined in my previous question } // just for testing def sendToKafkaString(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[String] = { val topic: String = ... val properties: Properties = ... val producer = new FlinkKafkaProducer[String]( topic, new ExampleDataStringSerializationSchema(topic), properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE) val exampleDataStreamString = exampleDataStream.map(_ => "fixed string to send to kafka") exampleDataStreamString.addSink(producer) } }
import java.nio.charset.StandardCharsets class ExampleDataStringSerializationSchema(topic: String) extends KafkaSerializationSchema[String]{ override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8)) } }
推荐答案
ObjectMapper 包装器解决方法对我有用.
ObjectMapper wrapper workaround worked for me.
https://github.com/kazuhirokomoda/flink-kafka-druid/pull/1
这篇关于从 Kafka 主题反序列化 FlinkKafkaProducer 的事件会导致空的 JSON 记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
Questions