Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数 [英] Spark SQL replacement for MySQL's GROUP_CONCAT aggregate function

查看:41
本文介绍了Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含两个字符串类型列 (username,friend) 的表,对于每个用户名,我想在一行中收集其所有朋友,并连接为字符串.例如:('username1', 'friends1,friends2,friends3')

I have a table of two string type columns (username, friend) and for each username, I want to collect all of its friends on one row, concatenated as strings. For example: ('username1', 'friends1, friends2, friends3')

我知道 MySQL 使用 GROUP_CONCAT 做到这一点.有没有办法用 Spark SQL 做到这一点?

I know MySQL does this with GROUP_CONCAT. Is there any way to do this with Spark SQL?

推荐答案

在你继续之前:这个操作是另一个groupByKey.虽然它有多个合法的应用程序,但价格相对昂贵,因此请务必仅在需要时使用它.

Before you proceed: This operations is yet another another groupByKey. While it has multiple legitimate applications it is relatively expensive so be sure to use it only when required.

不是完全简洁或有效的解决方案,但您可以使用 Spark 1.5.0 中引入的 UserDefinedAggregateFunction:

Not exactly concise or efficient solution but you can use UserDefinedAggregateFunction introduced in Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

示例用法:

val df = sc.parallelize(Seq(
  ("username1", "friend1"),
  ("username1", "friend2"),
  ("username2", "friend1"),
  ("username2", "friend3")
)).toDF("username", "friend")

df.groupBy($"username").agg(GroupConcat($"friend")).show

## +---------+---------------+
## | username|        friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+

您还可以创建一个 Python 包装器,如Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?

You can also create a Python wrapper as shown in Spark: How to map Python with Scala or Java User Defined Functions?

在实践中,提取 RDD、groupByKeymkString 和重建 DataFrame 会更快.

In practice it can be faster to extract RDD, groupByKey, mkString and rebuild DataFrame.

collect_list 函数(Spark >= 1.6.0) 与concat_ws 结合可以得到类似的效果:

You can get a similar effect by combining collect_list function (Spark >= 1.6.0) with concat_ws:

import org.apache.spark.sql.functions.{collect_list, udf, lit}

df.groupBy($"username")
  .agg(concat_ws(",", collect_list($"friend")).alias("friends"))

这篇关于Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆