Spark SQL:对数组值使用collect_set吗? [英] Spark SQL: using collect_set over array values?
问题描述
我有一个聚合的DataFrame,其中包含使用collect_set
创建的列.现在,我需要再次在此DataFrame上进行聚合,然后再次将collect_set
应用于该列的值.问题是我需要套用collect_Set
ver的值集-到目前为止,我看到的唯一方法是分解聚合的DataFrame.有更好的方法吗?
I have an aggregated DataFrame with a column created using collect_set
. I now need to aggregate over this DataFrame again, and apply collect_set
to the values of that column again. The problem is that I need to apply collect_Set
ver the values of the sets - and do far the only way I see how to do so is by exploding the aggregated DataFrame. Is there a better way?
示例:
初始DataFrame:
Initial DataFrame:
country | continent | attributes
-------------------------------------
Canada | America | A
Belgium | Europe | Z
USA | America | A
Canada | America | B
France | Europe | Y
France | Europe | X
聚合的DataFrame(我收到的输入数据)-在country
上的聚合:
Aggregated DataFrame (the one I receive as input) - aggregation over country
:
country | continent | attributes
-------------------------------------
Canada | America | A, B
Belgium | Europe | Z
USA | America | A
France | Europe | Y, X
我想要的输出-通过continent
聚合:
My desired output - aggregation over continent
:
continent | attributes
-------------------------------------
America | A, B
Europe | X, Y, Z
推荐答案
由于此时您只能容纳少量行,因此您只需按原样收集属性并将结果展平(Spark> = 2.4)
Since you can have only a handful of rows at this point, you just collect attributes as-is and flatten the result (Spark >= 2.4)
import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct}
val byState = Seq(
("Canada", "America", Seq("A", "B")),
("Belgium", "Europe", Seq("Z")),
("USA", "America", Seq("A")),
("France", "Europe", Seq("Y", "X"))
).toDF("country", "continent", "attributes")
byState
.groupBy("continent")
.agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes")
.show
+---------+----------+
|continent|attributes|
+---------+----------+
| Europe| [Y, X, Z]|
| America| [A, B]|
+---------+----------+
通常情况下,处理起来要困难得多,并且在许多情况下,如果您期望大型列表,每个组具有许多重复项和多个值,则最佳解决方案*是仅从头开始重新计算结果,即
In general case things are much harder to handle, and in many cases, if you expect large lists, with many duplicates and many values per group, the optimal solution* is to just recompute results from scratch, i.e.
input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")
一种可能的替代方法是使用Aggregator
One possible alternative is to use Aggregator
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
import scala.collection.mutable.{Set => MSet}
class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends
Aggregator[T, MSet[U], Seq[U]] with Serializable {
def zero = MSet.empty[U]
def reduce(acc: MSet[U], x: T) = {
for { v <- f(x) } acc.add(v)
acc
}
def merge(acc1: MSet[U], acc2: MSet[U]) = {
acc1 ++= acc2
}
def finish(acc: MSet[U]) = acc.toSeq
def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]]
def outputEncoder: Encoder[Seq[U]] = enc
}
并按照以下说明应用
case class CountryAggregate(
country: String, continent: String, attributes: Seq[String])
byState
.as[CountryAggregate]
.groupByKey(_.continent)
.agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn)
.toDF("continent", "attributes")
.show
+---------+----------+
|continent|attributes|
+---------+----------+
| Europe| [X, Y, Z]|
| America| [B, A]|
+---------+----------+
但这显然不是Java友好的选项.
but that's clearly not a Java-friendly option.
另请参见如何在groupBy之后将值汇总到集合中?(类似,但没有唯一性约束).
See also How to aggregate values into collection after groupBy? (similar, but without uniqueness constraint).
*这是因为explode
可能非常昂贵,尤其是在较旧的Spark版本中,与对SQL集合的外部表示的访问相同.
* That's because explode
can be quite expensive, especially in older Spark versions, same as access to external representation of SQL collections.
这篇关于Spark SQL:对数组值使用collect_set吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!