将元组列表作为参数传递给Scala中的Spark udf [英] Passing a list of tuples as a parameter to a spark udf in scala
问题描述
我正在尝试将元组列表传递给Scala中的udf.我不确定如何为此准确定义数据类型.我试图将其作为一整行传递,但它无法真正解决.我需要根据元组的第一个元素对列表进行排序,然后将n个元素发送回去.我已经尝试了udf的以下定义
I am trying to pass a list of tuples to a udf in scala. I am not sure how to exactly define the datatype for this. I tried to pass it as a whole row but it can't really resolve it. I need to sort the list based on the first element of the tuple and then send n number of elements back. I have tried the following definitions for the udf
def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )
def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )
def udfFilterPath = udf((id: Long, idList: Row)
这是idList的样子:
This is what the idList looks like:
[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]
这是一个具有100行的数据框,如上所述.我将udf称为:
This is a dataframe with a 100 rows like the above. I call the udf as follows:
testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show
当我为数据框打印模式时,它将其读取为结构数组.idList本身是通过在由键分组并存储在数据帧中的一列元组上执行收集列表而生成的.关于我在做什么错的任何想法吗?谢谢!
When I print the schema for the dataframe it reads it as a array of structs. The idList itself is generated by doing a collect list over a column of tuples grouped by a key and stored in the dataframe. Any ideas on what I am doing wrong? Thanks!
推荐答案
定义UDF时,应使用普通的Scala类型(例如Tuples,Primitives ...)和 not Spark SQL类型(例如 StructType
)作为输出类型.
When defining a UDF, you should use plain Scala types (e.g. Tuples, Primitives...) and not the Spark SQL types (e.g. StructType
) as the output types.
对于 input 类型-这是棘手的地方(并且记录不充分)-元组数组实际上是一个 mutable.WrappedArray [Row]
.因此-您必须先转换" 每行成一个元组,然后才能进行排序并返回结果.
As for the input types - this is where it gets tricky (and not too well documented) - an array of tuples would actually be a mutable.WrappedArray[Row]
. So - you'll have to "convert" each row into a tuple first, then you can do the sorting and return the result.
最后,根据您的描述,似乎根本没有使用 id
列,因此我将其从UDF定义中删除,但可以轻松地将其添加回去.
Lastly, by your description it seems that id
column isn't used at all, so I removed it from the UDF definition, but it can easily be added back.
val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
// converts the array items into tuples, sorts by first item and returns first two tuples:
idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}
df.withColumn("result", udfFilterPath($"idList")).show(false)
+------+-------------------------------------------+----------------------------+
|id |idList |result |
+------+-------------------------------------------+----------------------------+
|1234 |[[1234,Tony], [2345,Angela]] |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+
这篇关于将元组列表作为参数传递给Scala中的Spark udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!