用Scala为Kafka API 0.10用Scala编写的Kafka Consumer for Spark:定制AVRO解串器 [英] Kafka Consumer for Spark written in Scala for Kafka API 0.10: custom AVRO deserializer

查看:158
本文介绍了用Scala为Kafka API 0.10用Scala编写的Kafka Consumer for Spark:定制AVRO解串器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将我的Spark Scala App Kafka API升级到v.0.10.我曾经为字节序列格式的消息反序列化创建了自定义方法.

I am upgrading my Spark Scala App Kafka API to v. 0.10. I used to create custom method for deserialization of the message which comes in byte string format.

我已经意识到有一种方法可以将StringDeserializer或ByteArrayDeserializer作为参数传递给键或值.

I have realized there is a way to pass StringDeserializer or ByteArrayDeserializer as parameter to either key or value.

但是,我找不到有关如何创建自定义Avro模式反序列化器的任何信息,因此当我创建DirectStream并使用来自Kafka的数据时,我的kafkaStream可以使用它.

However,I can not find any information on how to create custom Avro schema deserializer so my kafkaStream can use it when I createDirectStream and consume data from Kafka.

有可能吗?

推荐答案

有可能.您需要覆盖在org.apache.kafka.common.serialization中定义的Deserializer<T>接口,并且需要通过保存Kafka参数的ConsumerStrategy[K, V]类将key.deserializervalue.deserializer指向自定义类.例如:

It is possible. You need to override the Deserializer<T> interface defined in org.apache.kafka.common.serialization and you need to point key.deserializer or value.deserializer to your custom class via the ConsumerStrategy[K, V] class which holds the Kafka parameters. For example:

import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}

然后:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

这篇关于用Scala为Kafka API 0.10用Scala编写的Kafka Consumer for Spark:定制AVRO解串器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆