使用架构将带有Spark的AVRO消息转换为DataFrame [英] Use schema to convert AVRO messages with Spark to DataFrame

查看:63
本文介绍了使用架构将带有Spark的AVRO消息转换为DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否有一种使用模式转换来自?用户记录的模式文件:

Is there a way to use a schema to convert avro messages from kafka with spark to dataframe? The schema file for user records:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}

和来自 SqlNetworkWordCount示例

And code snippets from SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages to read in messages.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

除了使用case类将AVRO消息转换为DataFrame之外,我找不到其他方法.是否可以使用该模式?我正在使用Spark 1.6.2Kafka 0.10.

Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2 and Kafka 0.10.

完整的代码,以备您感兴趣.

The complete code, in case you're interested.

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

推荐答案

OP可能解决了该问题,但是为了将来参考,我大致解决了此问题,因此认为在此处发布可能会有所帮助.

OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.

因此通常来说,您应该将Avro模式转换为Spark StructType,并将RDD中具有的对象转换为Row [Any],然后使用:

So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>

为了转换Avro模式,我使用了 spark-avro ,如下所示:

In order to convert the Avro schema I used spark-avro like so:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

RDD的转换比较棘手..如果您的模式很简单,则可能只需做一个简单的映射即可.

The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:

rdd.map(obj=>{
    val seq = (obj.getName(),obj.getAge()
    Row.fromSeq(seq))
    })

在此示例中,对象具有2个字段名称和年龄.

In this example the object has 2 fields name and age.

重要的是要确保Row中的元素与以前的StructType中的字段的顺序和类型相匹配.

The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.

在我的垂直案例中,我有一个要复杂得多的对象,因此我想对其进行通用处理以支持将来的架构更改,因此我的代码要复杂得多.

In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.

OP建议的方法在某些情况下也应适用,但很难暗示在复杂对象(不是原始或案例类)上

the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)

另一个提示是,如果您在一个类中有一个类,则应将该类转换为Row,以便将包装类转换为类似以下内容的

another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:

Row(Any,Any,Any,Row,...)

您还可以查看我之前提到的spark-avro项目,该项目是关于如何将对象转换为行的.我自己在其中使用了一些逻辑

you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself

如果阅读此书的人需要进一步的帮助,请在评论中问我,我会尽力帮助

If someone reading this needs further help ask me in the comments and I'll try to help

这里.

这篇关于使用架构将带有Spark的AVRO消息转换为DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆