Kafka Consumer for Spark 用 Scala 编写,用于 Kafka API 0.10:自定义 AVRO 解串器 [英] Kafka Consumer for Spark written in Scala for Kafka API 0.10: custom AVRO deserializer

查看:20
本文介绍了Kafka Consumer for Spark 用 Scala 编写,用于 Kafka API 0.10:自定义 AVRO 解串器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将我的 Spark Scala App Kafka API 升级到 0.10 版.我曾经创建自定义方法来反序列化字节字符串格式的消息.

I am upgrading my Spark Scala App Kafka API to v. 0.10. I used to create custom method for deserialization of the message which comes in byte string format.

我意识到有一种方法可以将 StringDeserializer 或 ByteArrayDeserializer 作为参数传递给键或值.

I have realized there is a way to pass StringDeserializer or ByteArrayDeserializer as parameter to either key or value.

但是,我找不到有关如何创建自定义 Avro 模式解串器的任何信息,因此我的 kafkaStream 可以在我 createDirectStream 并使用来自 Kafka 的数据时使用它.

However,I can not find any information on how to create custom Avro schema deserializer so my kafkaStream can use it when I createDirectStream and consume data from Kafka.

有可能吗?

推荐答案

这是可能的.你需要覆盖org.apache.kafka.common.serialization中定义的Deserializer接口,你需要指向key.deserializervalue.deserializer 通过包含 Kafka 参数的 ConsumerStrategy[K, V] 类添加到您的自定义类.例如:

It is possible. You need to override the Deserializer<T> interface defined in org.apache.kafka.common.serialization and you need to point key.deserializer or value.deserializer to your custom class via the ConsumerStrategy[K, V] class which holds the Kafka parameters. For example:

import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}

然后:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

这篇关于Kafka Consumer for Spark 用 Scala 编写,用于 Kafka API 0.10:自定义 AVRO 解串器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆