SPARK SQL生成数组的数组从SQL函数 [英] SPARK SQL Generate array of arrays from the sql function
问题描述
我想创建一个数组的数组。这是我的数据表:
I want to create an array of arrays. This is my data table:
// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)
// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))
// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)
// For SQL usage we need to register the table
df.registerTempTable("df")
我要创建整数列年龄的数组。对于我使用collect_list
I want to create an array of integer column "age". For that I use "collect_list":
sqlContext.sql("SELECT collect_list(age) as age from df").show
但现在我想要生成包含多个数组的数组上面创建的:
But now I want to generate an array containing multiple arrays as created above:
sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show
但是,这并不工作,或者使用功能org.apache.spark.sql.functions.array。任何想法?
But this does not work , or use the function org.apache.spark.sql.functions.array. Any ideas?
推荐答案
好吧,事情不能得到更简单。让我们考虑您正在使用相同的数据,并从那里
Ok, things can't get more simple. Let's consider the same data you are working on and go step by step from there
// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)
// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))
// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)
// For SQL usage we need to register the table
df.registerTempTable("df")
sqlContext.sql("select collect_list(age) as age from df").show
// +----------------+
// | age|
// +----------------+
// |[21, 26, 52, 31]|
// +----------------+
sqlContext.sql("select collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show
由于错误消息说:
As the error message says :
org.apache.spark.sql.AnalysisException: No handler for Hive udf class
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...]
collest_list
takes just one argument. Let's check the documentation here
它实际上需要一个说法!但是让我们更进一步的功能对象的文件中。你似乎已经注意到了阵列功能允许你出列或列重复参数创建新阵列列。因此,让我们使用:
It actually takes one argument ! But let's go further in the documentation of the functions object. You seem to have noticed that the array function allows you to create a new array column out of a Column or a repeated Column parameter. So let's use that :
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false)
该阵列功能确实创造列列表中列创建前手按年龄和薪水collect_list:
The array function create indeed a column from the column list create before-hand by collect_list on both age and salary :
// +-------------------------------------------------------------------+
// |arrayInt |
// +-------------------------------------------------------------------+
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]|
// +-------------------------------------------------------------------+
我们在哪里何去何从?
您必须记住,从数据帧的行只是另一个集合由行包。
You have to remember that a Row from a DataFrame is just another collection wrapped by a Row.
我会做的第一件事就是上采集工作。那么,我们如何展平WrappedArray [WrappedArray [INT]]
The first thing I'll do is work on that collection. So How do we flatten a WrappedArray[WrappedArray[Int]] ?
Scala是一种神奇的,你只需要使用 .flatten
Scala is kind of magical you just need to use .flatten
import scala.collection.mutable.WrappedArray
val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] =
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df")
.first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]]
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] =
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450))
firstRow.flatten
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450)
现在,让我们将它包装在一个UDF,所以我们可以把它用在数据框:
Now let's wrap it in a UDF so we can use it on the DataFrame :
def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]]))
由于我们注册的UDF,我们现在可以使用它里面sqlContext:
Since we registered the UDF, we can now use it inside the sqlContext :
sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false)
// +---------------------------------------+
// |arrayInt |
// +---------------------------------------+
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]|
// +---------------------------------------+
我希望这有助于!
I hope this helps !
这篇关于SPARK SQL生成数组的数组从SQL函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!