从 Kafka 主题反序列化 FlinkKafkaProducer 的事件会导致空的 JSON 记录 [英] Deserializing FlinkKafkaProducer's events from Kafka topic results in empty JSON records

查看:41
本文介绍了从 Kafka 主题反序列化 FlinkKafkaProducer 的事件会导致空的 JSON 记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

编辑我准备了最小的可重现示例 https://github.com/kazuhirokomoda/flink-kafka-druid

EDIT I prepared minimal reproducible example https://github.com/kazuhirokomoda/flink-kafka-druid

我有一个数据管道和代码,与我之前的问题中提到的完全相同(除非在本文底部另有说明):我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的? 基本上我在自定义 ObjectMapper>KafkaSerializationSchema(即ExampleDataSerializationSchema)来序列化事件.

I have a data pipeline and code, exactly the same (unless otherwise noted at the bottom of this post) as mentioned in my previous question: In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]? Basically I am using Jackson's ObjectMapper within custom KafkaSerializationSchema (i.e. ExampleDataSerializationSchema) to serialize events.

出于测试目的,我在裸机服务器上准备了 Kafka 2.6,遵循 https://kafka.apache.组织/快速入门

For testing purpose, I prepared Kafka 2.6 on a bare metal server, following https://kafka.apache.org/quickstart

我使用 kafka-console-consumer.sh 检查从 Flink 作业发送到 Kafka 主题的内容,但我只看到空的 JSON 记录,而我期待非空记录.>

I used kafka-console-consumer.sh to inspect what are sent from Flink job to Kafka topic, but I only saw empty JSON records, while I was expecting non-empty records.

bin/kafka-console-consumer.sh --topic <topic> --from-beginning --bootstrap-server localhost:9092
...
{}
{}
{}
^CProcessed a total of 287 messages

注意

  • 毫不奇怪,按照 The Kafka quickstart 从第 1 步到第 5 步:阅读事件(iestring 输入/输出与 Kafka 控制台生产者/消费者)工作正常.
  • 从 Flink 作业发送(固定)字符串(而不是 Jackson/JSON)也工作正常.(见下面的代码)
  • Note

    • Not surprisingly, following The Kafka quickstart from STEP 1 up to STEP 5: READ THE EVENTS (i.e.string input/output with Kafka console producer/consumer) worked fine.
    • Sending (fixed) string (instead of Jackson/JSON) from Flink job also worked fine. (See code below)
    • bin/kafka-console-consumer.sh --topic <topic> --from-beginning --bootstrap-server localhost:9092
      ...
      fixed string to send to kafka
      fixed string to send to kafka
      fixed string to send to kafka
      ^CProcessed a total of 287 messages
      

      问题

      • 使用自定义序列化程序时(即 Flink 的 ExampleDataSerializationSchema 如我上一个问题所示我应该采用哪种二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>]?) 发送的,我是否还需要定义自定义 de 序列化器?
        • 如果是,我该如何实现它以及我可以将它放在哪里,以便 Kafka 控制台使用者(以及最终 Druid,这是我上一个问题的目标)可以使用它?
        • Questions

          • When using custom serializer (i.e. Flink's ExampleDataSerializationSchema as shown in my previous question In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?), do I need to define custom deserializer as well ?
            • If yes, how can I implement it and where can I place it so Kafka console consumer (and eventually Druid, which is the goal of my previous question) can use it?
            • 再次感谢您的帮助.

              object KafkaSink {
                def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
                  // defined in my previous question
                }
              
                // just for testing
                def sendToKafkaString(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[String] = {
                  val topic: String = ...
                  val properties: Properties = ... 
              
                  val producer = new FlinkKafkaProducer[String](
                    topic,
                    new ExampleDataStringSerializationSchema(topic),
                    properties,
                    FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
              
                  val exampleDataStreamString = exampleDataStream.map(_ => "fixed string to send to kafka")
                  exampleDataStreamString.addSink(producer)
                }
              }
              

              import java.nio.charset.StandardCharsets
              
              class ExampleDataStringSerializationSchema(topic: String) extends KafkaSerializationSchema[String]{
                override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
                  new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))
                }
              }
              

              推荐答案

              ObjectMapper 包装器解决方法对我有用.

              ObjectMapper wrapper workaround worked for me.

              https://github.com/kazuhirokomoda/flink-kafka-druid/pull/1

              谢谢onecricketeer

              这篇关于从 Kafka 主题反序列化 FlinkKafkaProducer 的事件会导致空的 JSON 记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆