如何将 RDD[GenericRecord] 转换为 Scala 中的数据帧? [英] How to convert RDD[GenericRecord] to dataframe in scala?

查看:26
本文介绍了如何将 RDD[GenericRecord] 转换为 Scala 中的数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Avro(串行器和解串器)从 kafka 主题获取推文.然后我创建了一个火花消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文.现在我想将每个 rdd 转换为数据帧以通过 SQL 分析这些推文.任何将 RDD[GenericRecord] 转换为数据帧的解决方案?

I get tweets from kafka topic with Avro (serializer and deserializer). Then i create a spark consumer which extracts tweets in Dstream of RDD[GenericRecord]. Now i want to convert each rdd to a dataframe to analyse these tweets via SQL. Any solution to convert RDD[GenericRecord] to dataframe please ?

推荐答案

我花了一些时间试图使这项工作(特别是如何正确反序列化数据,但看起来您已经涵盖了这一点)...更新

I spent some time trying to make this work (specially how deserialize the data properly but it looks like you already cover this) ... UPDATED

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

如您所见,我正在使用 SchemaConverter 从您用于反序列化的架构中获取数据帧结构(这对于架构注册表来说可能会更痛苦).为此,您需要以下依赖项

As you see, I am using a SchemaConverter to get the dataframe structure from the schema that you used to deserialize (this could be more painful with schema registry). For this you need the following dependency

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

您需要根据自己的情况更改 spark 版本.

you will need to change your spark version depending on yours.

更新:上面的代码仅适用于 flat avro 架构.

UPDATE: the code above only works for flat avro schemas.

对于嵌套结构,我使用了不同的东西.您可以复制类 SchemaConverters,它必须在 com.databricks.spark.avro 内(它使用 databricks 包中的一些受保护的类),或者您可以尝试使用 spark-bigquery 依赖.默认情况下无法访问该类,因此您需要在包 com.databricks.spark.avro 中创建一个类来访问工厂方法.

For nested structures I used something different. You can copy the class SchemaConverters, it has to be inside of com.databricks.spark.avro (it uses some protected classes from the databricks package) or you can try to use the spark-bigquery dependency. The class will not be accessible by default, so you will need to create a class inside a package com.databricks.spark.avro to access the factory method.

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

之后,您应该能够像

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)

这篇关于如何将 RDD[GenericRecord] 转换为 Scala 中的数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆