如何在scala中将RDD [GenericRecord]转换为数据帧? [英] How to convert RDD[GenericRecord] to dataframe in scala?

查看:137
本文介绍了如何在scala中将RDD [GenericRecord]转换为数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我通过使用Avro(序列化器和反序列化器)从kafka主题获得了推文. 然后,我创建一个Spark使用者,该使用者在RDD [GenericRecord]的Dstream中提取推文. 现在,我想将每个rdd转换为数据帧,以通过SQL分析这些推文. 任何将RDD [GenericRecord]转换为数据帧的解决方案,请问?

I get tweets from kafka topic with Avro (serializer and deserializer). Then i create a spark consumer which extracts tweets in Dstream of RDD[GenericRecord]. Now i want to convert each rdd to a dataframe to analyse these tweets via SQL. Any solution to convert RDD[GenericRecord] to dataframe please ?

推荐答案

我花了一些时间尝试使之工作(特别是如何正确地反序列化数据,但看起来您已经涵盖了这方面)...更新

I spent some time trying to make this work (specially how deserialize the data properly but it looks like you already cover this) ... UPDATED

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

如您所见,我正在使用SchemaConverter从用于反序列化的模式中获取数据帧结构(这对于模式注册表可能会更痛苦).为此,您需要以下依赖项

As you see, I am using a SchemaConverter to get the dataframe structure from the schema that you used to deserialize (this could be more painful with schema registry). For this you need the following dependency

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

您将需要根据自己的需要更改spark版本.

you will need to change your spark version depending on yours.

更新:以上代码仅适用于 flat avro模式.

UPDATE: the code above only works for flat avro schemas.

对于嵌套结构,我使用了一些不同的东西.您可以复制类

For nested structures I used something different. You can copy the class SchemaConverters, it has to be inside of com.databricks.spark.avro (it uses some protected classes from the databricks package) or you can try to use the spark-bigquery dependency. The class will not be accessible by default, so you will need to create a class inside a package com.databricks.spark.avro to access the factory method.

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

之后,您应该能够像

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)

这篇关于如何在scala中将RDD [GenericRecord]转换为数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆