SPARK SQL替代MySQL的GROUP_CONCAT聚合函数 [英] SPARK SQL replacement for mysql GROUP_CONCAT aggregate function

查看:1342
本文介绍了SPARK SQL替代MySQL的GROUP_CONCAT聚合函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个字符串类型的列(用户名,朋友)的表,并为每个用户名,我要收集所有的它的朋友就一排,串连成字符串(用户名1','friends1,friends2,friends3') 。我知道,MySQL会GROUP_CONCAT这样做,有没有什么办法与SPARK SQL做到这一点?

I have a table of two string type columns (username, friend) and for each username, I want to collect all of it's friends on one row, concatenated as strings ('username1', 'friends1, friends2, friends3'). I know MySql does this by GROUP_CONCAT, is there any way to do this with SPARK SQL?

感谢

推荐答案

不完全或简洁高效的解决方案,但你可以使用 UserDefinedAggregateFunction 在星火1.5.0介绍:

Not exactly concise or efficient solution but you can use UserDefinedAggregateFunction introduced in Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

实例:

val df = sc.parallelize(Seq(
  ("username1", "friend1"),
  ("username1", "friend2"),
  ("username2", "friend1"),
  ("username2", "friend3")
)).toDF("username", "friend")

df.groupBy($"username").agg(GroupConcat($"friend")).show

## +---------+---------------+
## | username|        friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+

在实践中能够更快的提取RDD, groupByKey mkString 和重建数据帧。

In practice it can be faster to extract RDD, groupByKey, mkString and rebuild DataFrame.

您可以通过合并 list_collect 函数(星火> = 1.6.0)得到类似的效果

You can get a similar effect by combining list_collect function (Spark >= 1.6.0):

import org.apache.spark.sql.functions.{collect_list, udf, lit}

df.groupBy($"username")
  .agg(concat_ws(",", collect_list($"friend")).alias("friends"))

这篇关于SPARK SQL替代MySQL的GROUP_CONCAT聚合函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆