SPARK SQL替代MySQL的GROUP_CONCAT聚合函数 [英] SPARK SQL replacement for mysql GROUP_CONCAT aggregate function
问题描述
我有两个字符串类型的列(用户名,朋友)的表,并为每个用户名,我要收集所有的它的朋友就一排,串连成字符串(用户名1','friends1,friends2,friends3') 。我知道,MySQL会GROUP_CONCAT这样做,有没有什么办法与SPARK SQL做到这一点?
I have a table of two string type columns (username, friend) and for each username, I want to collect all of it's friends on one row, concatenated as strings ('username1', 'friends1, friends2, friends3'). I know MySql does this by GROUP_CONCAT, is there any way to do this with SPARK SQL?
感谢
推荐答案
不完全或简洁高效的解决方案,但你可以使用 UserDefinedAggregateFunction
在星火1.5.0介绍:
Not exactly concise or efficient solution but you can use UserDefinedAggregateFunction
introduced in Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
实例:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
在实践中能够更快的提取RDD, groupByKey
, mkString
和重建数据帧。
In practice it can be faster to extract RDD, groupByKey
, mkString
and rebuild DataFrame.
您可以通过合并 list_collect
函数(星火> = 1.6.0)得到类似的效果
You can get a similar effect by combining list_collect
function (Spark >= 1.6.0):
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
这篇关于SPARK SQL替代MySQL的GROUP_CONCAT聚合函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!