星火数据框GROUPBY与AGG进行追加名单 [英] Spark Dataframe groupby with agg performing list appending

查看:285
本文介绍了星火数据框GROUPBY与AGG进行追加名单的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个模式作为这样一个数据帧:

I have a dataframe with schema as such:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]

寻找一种方式来组(或者汇总?)这个数据帧由visitorid其中trackingIds和emailIds列将共同追加。因此,例如,如果我最初的DF如下:

Looking for a way to group (or maybe rollup?) this dataframe by visitorid where the trackingIds and emailIds columns would append together. So for example if my initial df looks like:

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b]      |    [12]
|7g21|      [c0b5]      |    [45]
|7g21|      [c0b4]      |    [87]
|a158|      [666b, 777c]|    []

我想我的输出DF看起来像这样

I would like my output df to look like this

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b,666b,777c]|      [12,'']
|7g21|      [c0b5,c0b4]     |      [45, 87]

尝试使用GROUPBY和AGG功能,但没有多少运气。

Attempting to use the groupBy and agg functions but not have much luck.

感谢。

推荐答案

这是可能的,但相当昂贵。使用数据您提供:

It is possible but quite expensive. Using data you've provided:

case class Record(
    visitorId: String, trackingIds: Array[String], emailIds: Array[String])

val df = sc.parallelize(Seq(
  Record("a158", Array("666b"), Array("12")),
  Record("7g21", Array("c0b5"), Array("45")),
  Record("7g21", Array("c0b4"), Array("87")),
  Record("a158", Array("666b",  "777c"), Array.empty[String]))).toDF

我们有爆炸每个数组:

we have explode each array:

val exploded = df
  .withColumn("trackingIds",  explode($"trackingIds"))
  .withColumn("emailIds", explode($"emailIds"))

collect_lists

exploded.registerTempTable("exploded")

// In Spark >= 1.6 you can use collect_list function 
// here we'll use Hive UDF directly
// Please note that it requires HiveContext

val rolledUp = sqlContext.sql("""
    SELECT visitorId,
           collect_list(trackingIds) AS trackingIds,
           collect_list(emailIds) AS emailIds
    FROM exploded
    GROUP BY visitorId""")

您也可以转换为RDD和组

You can also convert to RDD and group

val rolledUp = df.rdd
  .map {
     case Row(id: String, 
       trcks: Seq[String @ unchecked],
       emails: Seq[String @ unchecked]) => (id, (trcks, emails))
  }
  .reduceByKey{case ((xs1, ys1), (xs2, ys2)) => (xs1 ++ xs2, ys1 ++ ys2)}
  .map{case (id, (trcks, emails)) => Record(id, trcks.toArray, emails.toArray)}
  .toDF

请注意:为了减少GC您应该考虑更换pressure reduceByKey groupByKey ,然后按 mapValues​​

Note: To reduce pressure on GC you should consider replacing reduceByKey with groupByKey followed by mapValues.

这篇关于星火数据框GROUPBY与AGG进行追加名单的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆