星火数据框GROUPBY与AGG进行追加名单 [英] Spark Dataframe groupby with agg performing list appending
问题描述
我有一个模式作为这样一个数据帧:
I have a dataframe with schema as such:
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
寻找一种方式来组(或者汇总?)这个数据帧由visitorid其中trackingIds和emailIds列将共同追加。因此,例如,如果我最初的DF如下:
Looking for a way to group (or maybe rollup?) this dataframe by visitorid where the trackingIds and emailIds columns would append together. So for example if my initial df looks like:
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []
我想我的输出DF看起来像这样
I would like my output df to look like this
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]
尝试使用GROUPBY和AGG功能,但没有多少运气。
Attempting to use the groupBy and agg functions but not have much luck.
感谢。
推荐答案
这是可能的,但相当昂贵。使用数据您提供:
It is possible but quite expensive. Using data you've provided:
case class Record(
visitorId: String, trackingIds: Array[String], emailIds: Array[String])
val df = sc.parallelize(Seq(
Record("a158", Array("666b"), Array("12")),
Record("7g21", Array("c0b5"), Array("45")),
Record("7g21", Array("c0b4"), Array("87")),
Record("a158", Array("666b", "777c"), Array.empty[String]))).toDF
我们有爆炸每个数组:
we have explode each array:
val exploded = df
.withColumn("trackingIds", explode($"trackingIds"))
.withColumn("emailIds", explode($"emailIds"))
和 collect_lists
:
exploded.registerTempTable("exploded")
// In Spark >= 1.6 you can use collect_list function
// here we'll use Hive UDF directly
// Please note that it requires HiveContext
val rolledUp = sqlContext.sql("""
SELECT visitorId,
collect_list(trackingIds) AS trackingIds,
collect_list(emailIds) AS emailIds
FROM exploded
GROUP BY visitorId""")
您也可以转换为RDD和组
You can also convert to RDD and group
val rolledUp = df.rdd
.map {
case Row(id: String,
trcks: Seq[String @ unchecked],
emails: Seq[String @ unchecked]) => (id, (trcks, emails))
}
.reduceByKey{case ((xs1, ys1), (xs2, ys2)) => (xs1 ++ xs2, ys1 ++ ys2)}
.map{case (id, (trcks, emails)) => Record(id, trcks.toArray, emails.toArray)}
.toDF
请注意:为了减少GC您应该考虑更换pressure reduceByKey
与 groupByKey
,然后按 mapValues
。
Note: To reduce pressure on GC you should consider replacing reduceByKey
with groupByKey
followed by mapValues
.
这篇关于星火数据框GROUPBY与AGG进行追加名单的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!