如何(加入)信息跨阵列结合[数据框] [英] How to combine (join) information across an Array[DataFrame]
本文介绍了如何(加入)信息跨阵列结合[数据框]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个数组[数据框]我要检查,对每个数据帧中的每一行,如果有按列中的值的任何变化。说我有三个数据帧的第一行,如:
I have an Array[DataFrame] and I want to check, for each row of each data frame, if there is any change in the values by column. Say I have the first row of three data frames, like:
(0,1.0,0.4,0.1)
(0,3.0,0.2,0.1)
(0,5.0,0.4,0.1)
第一列是ID,而我这个ID理想的输出是:
The first column is the ID, and my ideal output for this ID would be:
(0, 1, 1, 0)
意思是第二和第三列改变而第三没有。
我在这里附上一个数据位来复制我的设置
meaning that the second and third columns changed while the third did not. I attach here a bit of data to replicate my setting
val rdd = sc.parallelize(Array((0,1.0,0.4,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.9,0.2),
(3,0.9,0.2,0.2),
(4,0.3,0.5,0.5)))
val rdd2 = sc.parallelize(Array((0,3.0,0.2,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.5,0.2),
(3,0.8,0.1,0.1),
(4,0.3,0.5,0.5)))
val rdd3 = sc.parallelize(Array((0,5.0,0.4,0.1),
(1,0.5,0.3,0.3),
(2,0.3,0.3,0.5),
(3,0.3,0.3,0.1),
(4,0.3,0.5,0.5)))
val df = rdd.toDF("id", "prop1", "prop2", "prop3")
val df2 = rdd2.toDF("id", "prop1", "prop2", "prop3")
val df3 = rdd3.toDF("id", "prop1", "prop2", "prop3")
val result:Array[DataFrame] = new Array[DataFrame](3)
result.update(0, df)
result.update(1,df2)
result.update(2,df3)
我该如何映射阵列上,让我的输出?
How can I map over the array and get my output?
推荐答案
您可以使用 countDistinct
与 GROUPBY
:
import org.apache.spark.sql.functions.{countDistinct}
val exprs = Seq("prop1", "prop2", "prop3")
.map(c => (countDistinct(c) > 1).cast("integer").alias(c))
val combined = result.reduce(_ unionAll _)
val aggregatedViaGroupBy = combined
.groupBy($"id")
.agg(exprs.head, exprs.tail: _*)
aggregatedViaGroupBy.show
// +---+-----+-----+-----+
// | id|prop1|prop2|prop3|
// +---+-----+-----+-----+
// | 0| 1| 1| 0|
// | 1| 1| 0| 0|
// | 2| 1| 1| 1|
// | 3| 1| 1| 1|
// | 4| 0| 0| 0|
// +---+-----+-----+-----+
这篇关于如何(加入)信息跨阵列结合[数据框]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文