我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的? [英] In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?

查看:24
本文介绍了我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用 Scala 编写的数据管道,需要改进.

I have a data pipeline, written in Scala, to improve.

  • 原样:Flink 1.8 ->(Tranquility, 官方支持 Druid 0.9.2) ->德鲁伊 0.20.1
  • 未来:Flink 1.11 ->卡夫卡->Druid 0.20.1 使用推荐的 Druid Kafka 索引服务.

Flink 应用程序生成 Scala 案例类的 DataStream(以下代码中的示例数据).

The Flink app generates DataStream of Scala case class (ExampleData in the below code).

根据 Flink 序列化调优卷.1:选择你的序列化器——如果可以,Scala case 类属于Flink 提供的特殊序列化器",但我不确定它是如何被序列化的,因此如何(即什么格式)应该是德鲁伊从Kafka topic读取时反序列化的.

According to Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can, Scala case class falls into "Flink-provided special serializers", but I am not sure how it gets serialized and thus how (i.e. in what format) it is supposed to be deserialized when Druid reads it from Kafka topic.

(除非有充分的理由,否则我不想向 Flink 应用添加依赖项,以降低维护成本)

(I don't want to add dependencies to Flink app unless for good reason, to keep maintenance cost less)

所以我的问题是

  • Which (binary) data format mentioned in the Druid doc should I use for Druid to ingest events from Kafka topic?
  • Or how can I specify data format on Flink side so Druid can read (via Kafka)?
  • With regards to data format, is there any setting required on Kafka side as well?

感谢您提供的所有帮助/信息,感谢您的关注.

I appreciate all the help/info you will be providing, thank you for your attention.

case class ExampleData(timestamp: Long, id: Int, name: String, price: BigDecimal) extends BaseData

trait BaseData {
    val timestamp: Long
    val name: String
}

弗林克 ->卡夫卡

val props: ParameterTool = ...
...
KafkaSink.sendToKafka(exampleDataStream, props)

object KafkaSink {
  def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
    val topic: String = ...
    val properties: Properties = ... 

    val producer = new FlinkKafkaProducer[ExampleData](
      topic,
      new ExampleDataSerializationSchema(topic),
      properties,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

    exampleDataStream.addSink(producer)
  }
}

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper

class ExampleDataSerializationSchema(topic: String) extends KafkaSerializationSchema[ExampleData]{
  val mapper = new ObjectMapper()

  // https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-serializationschema
  // https://stackoverflow.com/a/58644689
  override def serialize(element: ExampleData, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    val elementAsBytes: Array[Byte] =
      try {
        mapper.writeValueAsBytes(element)
      } catch {
        case e: JsonProcessingException => {
          Array[Byte]()
        }
      }
    new ProducerRecord[Array[Byte], Array[Byte]](topic, elementAsBytes)
    }
}

环境

  • Druid 0.20.1(如果绝对需要可以升级更多)
  • Flink 1.11.2(由于某种原因应该是 1.11.x)
  • Kafka 0.11.x(我们对版本没有太多控制)
  • 推荐答案

    你的序列化器使用 Jackson/JSON,不管它是 Scala 类,所以你应该使用 Driud 的 JSON one

    Your serializer uses Jackson/JSON, doesn't matter that it is a Scala class, so you should use Driud's JSON one

      "ioConfig": {
        "topic": "your_topic_name",
        "inputFormat": {
          "type": "json"
        },
    

    FWIW,如果你在你的环境中添加 Confluent Schema Registry,你不需要为每个类编写自己的 SerializationSchema,因为你可以使用 ConfluentRegistryAvroSerializationSchema(Avro 格式),而且 Druid 似乎支持架构注册表也是如此(至少对于 Avro,不确定 Protobuf 或 JSONSchema).

    FWIW, if you add Confluent Schema Registry to your environment, you do not need to write your own SerializationSchema for each class, as you can use ConfluentRegistryAvroSerializationSchema (Avro format), and Druid seems to support the Schema Registry as well (at least for Avro, not sure about Protobuf or JSONSchema).

    这篇关于我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆