我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的? [英] In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?
问题描述
我有一个用 Scala 编写的数据管道,需要改进.
I have a data pipeline, written in Scala, to improve.
- 原样:Flink 1.8 ->(Tranquility, 官方支持 Druid 0.9.2) ->德鲁伊 0.20.1
- 未来:Flink 1.11 ->卡夫卡->Druid 0.20.1 使用推荐的 Druid Kafka 索引服务.
Flink 应用程序生成 Scala 案例类的 DataStream(以下代码中的示例数据).
The Flink app generates DataStream of Scala case class (ExampleData in the below code).
根据 Flink 序列化调优卷.1:选择你的序列化器——如果可以,Scala case 类属于Flink 提供的特殊序列化器",但我不确定它是如何被序列化的,因此如何(即什么格式)应该是德鲁伊从Kafka topic读取时反序列化的.
According to Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can, Scala case class falls into "Flink-provided special serializers", but I am not sure how it gets serialized and thus how (i.e. in what format) it is supposed to be deserialized when Druid reads it from Kafka topic.
(除非有充分的理由,否则我不想向 Flink 应用添加依赖项,以降低维护成本)
(I don't want to add dependencies to Flink app unless for good reason, to keep maintenance cost less)
所以我的问题是
- 德鲁伊文档中提到的(二进制)数据格式 我应该使用 Druid 从 Kafka 主题中摄取事件吗?
- 或者如何在 Flink 端指定数据格式以便 Druid 可以读取(通过 Kafka)?
- 关于数据格式,Kafka端也需要设置吗?
- Which (binary) data format mentioned in the Druid doc should I use for Druid to ingest events from Kafka topic?
- Or how can I specify data format on Flink side so Druid can read (via Kafka)?
- With regards to data format, is there any setting required on Kafka side as well?
感谢您提供的所有帮助/信息,感谢您的关注.
I appreciate all the help/info you will be providing, thank you for your attention.
case class ExampleData(timestamp: Long, id: Int, name: String, price: BigDecimal) extends BaseData
trait BaseData {
val timestamp: Long
val name: String
}
弗林克 ->卡夫卡
val props: ParameterTool = ...
...
KafkaSink.sendToKafka(exampleDataStream, props)
object KafkaSink {
def sendToKafka(exampleDataStream: DataStream[ExampleData], props: ParameterTool): DataStreamSink[ExampleData] = {
val topic: String = ...
val properties: Properties = ...
val producer = new FlinkKafkaProducer[ExampleData](
topic,
new ExampleDataSerializationSchema(topic),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
exampleDataStream.addSink(producer)
}
}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
class ExampleDataSerializationSchema(topic: String) extends KafkaSerializationSchema[ExampleData]{
val mapper = new ObjectMapper()
// https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-serializationschema
// https://stackoverflow.com/a/58644689
override def serialize(element: ExampleData, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val elementAsBytes: Array[Byte] =
try {
mapper.writeValueAsBytes(element)
} catch {
case e: JsonProcessingException => {
Array[Byte]()
}
}
new ProducerRecord[Array[Byte], Array[Byte]](topic, elementAsBytes)
}
}
环境
- Druid 0.20.1(如果绝对需要可以升级更多)
- Flink 1.11.2(由于某种原因应该是 1.11.x)
- Kafka 0.11.x(我们对版本没有太多控制)
- https://druid.apache.org/docs/0.20.1/development/extensions-core/kafka-ingestion.html
- https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
- https://druid.apache.org/docs/0.20.1/ingestion/data-formats.html
推荐答案
你的序列化器使用 Jackson/JSON,不管它是 Scala 类,所以你应该使用 Driud 的 JSON one
Your serializer uses Jackson/JSON, doesn't matter that it is a Scala class, so you should use Driud's JSON one
"ioConfig": {
"topic": "your_topic_name",
"inputFormat": {
"type": "json"
},
FWIW,如果你在你的环境中添加 Confluent Schema Registry,你不需要为每个类编写自己的 SerializationSchema,因为你可以使用 ConfluentRegistryAvroSerializationSchema
(Avro 格式),而且 Druid 似乎支持架构注册表也是如此(至少对于 Avro,不确定 Protobuf 或 JSONSchema).
FWIW, if you add Confluent Schema Registry to your environment, you do not need to write your own SerializationSchema for each class, as you can use ConfluentRegistryAvroSerializationSchema
(Avro format), and Druid seems to support the Schema Registry as well (at least for Avro, not sure about Protobuf or JSONSchema).
这篇关于我应该以什么二进制格式将 Kafka 主题摄取到 Druid,其事件是从 FlinkKafkaProducer[<Scala case class>] 发送的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!