Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数 [英] Spark SQL replacement for MySQL's GROUP_CONCAT aggregate function
问题描述
我有一个包含两个字符串类型列 (username,friend)
的表,对于每个用户名,我想在一行中收集其所有朋友,并连接为字符串.例如:('username1', 'friends1,friends2,friends3')
I have a table of two string type columns (username, friend)
and for each username, I want to collect all of its friends on one row, concatenated as strings. For example: ('username1', 'friends1, friends2, friends3')
我知道 MySQL 使用 GROUP_CONCAT
做到这一点.有没有办法用 Spark SQL 做到这一点?
I know MySQL does this with GROUP_CONCAT
. Is there any way to do this with Spark SQL?
推荐答案
在你继续之前:这个操作是另一个 groupByKey
.虽然它有多个合法的应用程序,但价格相对昂贵,因此请务必仅在需要时使用它.
Before you proceed: This operations is yet another another groupByKey
. While it has multiple legitimate applications it is relatively expensive so be sure to use it only when required.
不是完全简洁或有效的解决方案,但您可以使用 Spark 1.5.0 中引入的 UserDefinedAggregateFunction
:
Not exactly concise or efficient solution but you can use UserDefinedAggregateFunction
introduced in Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
示例用法:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
您还可以创建 Python 包装器,如Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?
You can also create a Python wrapper as shown in Spark: How to map Python with Scala or Java User Defined Functions?
在实践中,提取 RDD、groupByKey
、mkString
和重建 DataFrame 会更快.
In practice it can be faster to extract RDD, groupByKey
, mkString
and rebuild DataFrame.
将collect_list
函数(Spark >= 1.6.0) 与concat_ws
结合可以得到类似的效果:
You can get a similar effect by combining collect_list
function (Spark >= 1.6.0) with concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
这篇关于Spark SQL 替换 MySQL 的 GROUP_CONCAT 聚合函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!