如何在groupBy之后将值聚合到集合中? [英] How to aggregate values into collection after groupBy?

查看:24
本文介绍了如何在groupBy之后将值聚合到集合中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有架构的数据框:

I have a dataframe with schema as such:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]

正在寻找一种按访问者 ID 分组(或汇总?)此数据框的方法,其中 trackingIds 和 emailIds 列将附加在一起.例如,如果我的初始 df 看起来像:

Looking for a way to group (or maybe rollup?) this dataframe by visitorid where the trackingIds and emailIds columns would append together. So for example if my initial df looks like:

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b]      |    [12]
|7g21|      [c0b5]      |    [45]
|7g21|      [c0b4]      |    [87]
|a158|      [666b, 777c]|    []

我希望我的输出 df 看起来像这样

I would like my output df to look like this

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b,666b,777c]|      [12,'']
|7g21|      [c0b5,c0b4]     |      [45, 87]

尝试使用 groupByagg 运算符,但运气不佳.

Attempting to use groupBy and agg operators but not have much luck.

推荐答案

Spark >= 2.4

您可以将 flatten udf 替换为内置的 flatten 函数

You can replace flatten udf with built-in flatten function

import org.apache.spark.sql.functions.flatten

保持原样.

Spark >= 2.0,<2.4

这是可能的,但相当昂贵.使用您提供的数据:

It is possible but quite expensive. Using data you've provided:

case class Record(
    visitorId: String, trackingIds: Array[String], emailIds: Array[String])

val df = Seq(
  Record("a158", Array("666b"), Array("12")),
  Record("7g21", Array("c0b5"), Array("45")),
  Record("7g21", Array("c0b4"), Array("87")),
  Record("a158", Array("666b",  "777c"), Array.empty[String])).toDF

和一个辅助函数:

import org.apache.spark.sql.functions.udf

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)

我们可以用占位符填空:

we can fill the blanks with placeholders:

import org.apache.spark.sql.functions.{array, lit, when}

val dfWithPlaceholders = df.withColumn(
  "emailIds", 
  when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))

collect_listsflatten:

import org.apache.spark.sql.functions.{array, collect_list}

val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")

df
  .groupBy($"visitorId")
  .agg(trackingIds, emailIds)

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     a158|[666b, 666b, 777c]|  [12, ]|
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

使用静态类型Dataset:

df.as[Record]
  .groupByKey(_.visitorId)
  .mapGroups { case (key, vs) => 
    vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
      case (trackingIds, emailIds) => 
        Record(key, trackingIds.flatten, emailIds.flatten)
  }}

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     a158|[666b, 666b, 777c]|  [12, ]|
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

Spark 1.x

可以转成RDD和分组

import org.apache.spark.sql.Row

dfWithPlaceholders.rdd
  .map {
     case Row(id: String, 
       trcks: Seq[String @ unchecked],
       emails: Seq[String @ unchecked]) => (id, (trcks, emails))
  }
  .groupByKey
  .map {case (key, vs) => vs.toArray.unzip match {
    case (trackingIds, emailIds) => 
      Record(key, trackingIds.flatten, emailIds.flatten)
  }}
  .toDF

// +---------+------------------+--------+
// |visitorId|       trackingIds|emailIds|
// +---------+------------------+--------+
// |     7g21|      [c0b5, c0b4]|[45, 87]|
// |     a158|[666b, 666b, 777c]|  [12, ]|
// +---------+------------------+--------+

这篇关于如何在groupBy之后将值聚合到集合中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆