使用模式将带有 Spark 的 AVRO 消息转换为 DataFrame [英] Use schema to convert AVRO messages with Spark to DataFrame

查看:23
本文介绍了使用模式将带有 Spark 的 AVRO 消息转换为 DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法使用模式来转换 来自 的消息,带有 ?用户记录的架构文件:

Is there a way to use a schema to convert avro messages from kafka with spark to dataframe? The schema file for user records:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}

以及来自 SqlNetworkWordCount 示例Kafka、Spark 和 Avro - 第 3 部分,生成和使用 Avro 消息以读取消息.

And code snippets from SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages to read in messages.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

不知何故,除了使用 case 类将 AVRO 消息转换为 DataFrame 之外,我找不到其他方法.有没有可能改用模式?我正在使用 Spark 1.6.2Kafka 0.10.

Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2 and Kafka 0.10.

完整代码,如果您有兴趣.

The complete code, in case you're interested.

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

推荐答案

OP 可能解决了这个问题,但为了将来参考,我很一般地解决了这个问题,所以认为在这里发帖可能会有所帮助.

OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.

所以一般来说,您应该将 Avro 模式转换为 spark StructType,并将您在 RDD 中的对象转换为 Row[Any],然后使用:

So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>

为了转换 Avro 模式,我使用了 spark-avro 像这样:

In order to convert the Avro schema I used spark-avro like so:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

RDD 的转换更加棘手.. 如果您的架构很简单,您可能只需做一个简单的映射.. 像这样:

The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:

rdd.map(obj=>{
    val seq = (obj.getName(),obj.getAge()
    Row.fromSeq(seq))
    })

在这个例子中,对象有 2 个字段 name 和 age.

In this example the object has 2 fields name and age.

重要的是要确保 Row 中的元素与之前 StructType 中字段的顺序和类型相匹配.

The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.

在我的特定情况下,我有一个更复杂的对象,我想对其进行一般处理以支持未来的架构更改,因此我的代码要复杂得多.

In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.

OP 建议的方法也应该适用于某些 casee,但很难暗示复杂对象(不是原始对象或 case-class)

the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)

另一个提示是,如果您在一个类中有一个类,您应该将该类转换为 Row,以便包装类将转换为类似:

another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:

Row(Any,Any,Any,Row,...)

您还可以查看我之前提到的关于如何将对象转换为行的 spark-avro 项目.我自己在那里使用了一些逻辑

you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself

如果有人阅读本文需要进一步帮助,请在评论中问我,我会尽力提供帮助

If someone reading this needs further help ask me in the comments and I'll try to help

类似的问题也解决了这里.

这篇关于使用模式将带有 Spark 的 AVRO 消息转换为 DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆