Spark Dataframe 中的聚合数组类型 [英] Aggregate array type in Spark Dataframe

查看:35
本文介绍了Spark Dataframe 中的聚合数组类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DataFrame 订单:

I have a DataFrame orders:

+-----------------+-----------+--------------+
|               Id|    Order  |        Gender|
+-----------------+-----------+--------------+
|             1622|[101330001]|          Male|
|             1622|   [147678]|          Male|
|             3837|  [1710544]|          Male|
+-----------------+-----------+--------------+

我想对 Id 和 Gender 进行分组,然后汇总订单.我正在使用 org.apache.spark.sql.functions 包和代码如下:

which I want to groupBy on Id and Gender and then aggregate orders. I am using org.apache.spark.sql.functions package and code looks like:

DataFrame group = orders.withColumn("orders", col("order"))
                .groupBy(col("Id"), col("Gender"))
                .agg(collect_list("products"));

然而,由于列 Order 是数组类型,我得到这个异常,因为它需要一个原始类型:

However since column Order is of type array I get this exception because it expects a primitive type:

User class threw exception: org.apache.spark.sql.AnalysisException: No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but array<string> was passed as parameter 1

我查看了包,有数组的排序函数,但没有聚合函数.知道怎么做吗?谢谢.

I have looked in the package and there are sort functions for arrays but no aggregate functions. Any idea how to do it? Thanks.

推荐答案

这种情况下你可以定义自己的函数并注册为UDF

In this case you can define your own function and register it as UDF

val userDefinedFunction = ???
val udfFunctionName = udf[U,T](userDefinedFunction)

然后,而不是在该函数中传递该列,以便将其转换为原始类型,然后将其传递到 with Columns 方法中.

Then instead of then pass that column inside that function so that it gets converted into primitive type and then pass it in the with Columns method.

像这样:

val dataF:(Array[Int])=>Int=_.head

val dataUDF=udf[Int,Array[Int]](dataF)


DataFrame group = orders.withColumn("orders", dataUDF(col("order")))
                .groupBy(col("Id"), col("Gender"))
                .agg(collect_list("products"));

我希望它有效!

这篇关于Spark Dataframe 中的聚合数组类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆