使用具有案例类和列名别名的反射来进行Spark Dataframe模式定义 [英] Spark Dataframe schema definition using reflection with case classes and column name aliases

查看:90
本文介绍了使用具有案例类和列名别名的反射来进行Spark Dataframe模式定义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Spark Scala脚本遇到了一个小问题.基本上,我有原始数据,正在对分组和计数等进行聚合,然后将输出保存为特定的JSON格式.

I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.

我试图简化问题并重写:

I tried to simplify the question and rewrote it:

当我使用Array[org.apache.spark.sql.Column]从源数据帧中选择数据时,其中列名具有别名,然后在尝试将行映射到case类时使用列名(或实际上是索引)作为变量,那么我得到一个任务无法序列化"例外.

When I select data from the source dataframe with an Array[org.apache.spark.sql.Column] where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a "Task not serializable" exception.

var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")

val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF

第二部分或问题,我实际上需要最终的模式成为这些Result类对象的数组.我还没有弄清楚如何做到这一点.预期的结果应该具有这样的模式:

And the second part or the question, I actually need the final schema to be an array of these Result class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:

    case class Test(var FilteredStatistics: Array[Result])
    val t = Test(Array(Result("Anna"), Result("James")))

    val t2 = sc.parallelize(Seq(t)).toDF

    scala> t2.printSchema
    root
     |-- FilteredStatistics: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- Name: string (nullable = true)

TL; DR :

  1. 当数据框列具有别名并且变量用作列名时,如何将数据框行映射到案例类对象?

  1. How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?

如何将这些case类对象添加到数组中?

How to add these case class objects to an array?

推荐答案

序列化问题:这里的问题是val n = "Name":它在传递给RDD转换的匿名函数中使用(dm2.map(...)),这使Spark在该变量及其包含的范围上接近,该变量还包含类型为Array[Column]cl2,因此无法序列化.

Serialization Issue: the problem here is the val n = "Name": it is used inside an anonymous function passed into an RDD transformation (dm2.map(...)), which makes Spark close over that variable and the scope containing it, which also includes cl2 which has the type Array[Column], hence it isn't serializable.

解决方案很简单-内联n(以获取dm2.map(row => Result(row.getAs("Name")))),或将其置于可序列化的上下文中(对象或不包含任何不可序列化成员的类).

The solution is simple - either inline n (to get dm2.map(row => Result(row.getAs("Name")))), or place it in a Serializable context (an object or a class that doesn't contain any non-serializable members).

这篇关于使用具有案例类和列名别名的反射来进行Spark Dataframe模式定义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆