在 Spark DataFrame 中查找每组的最大行数 [英] Find maximum row per group in Spark DataFrame
问题描述
我正在尝试使用 Spark 数据帧而不是 RDD,因为它们似乎比 RDD 更高级,并且倾向于生成更易读的代码.
I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code.
在一个 14 节点的 Google Dataproc 集群中,我有大约 600 万个名称被两个不同的系统转换为 ID:sa
和 sb
.每个Row
包含name
、id_sa
和id_sb
.我的目标是生成从 id_sa
到 id_sb
的映射,这样对于每个 id_sa
,对应的 id_sb
是附加到 id_sa
的所有名称中最常见的 id.
In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa
and sb
. Each Row
contains name
, id_sa
and id_sb
. My goal is to produce a mapping from id_sa
to id_sb
such that for each id_sa
, the corresponding id_sb
is the most frequent id among all names attached to id_sa
.
让我们试着用一个例子来澄清.如果我有以下几行:
Let's try to clarify with an example. If I have the following rows:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
我的目标是生成从 a1
到 b2
的映射.实际上,与 a1
相关联的名称是 n1
、n2
和 n3
,它们分别映射到 b1
、b2
和 b2
,所以 b2
是与 a1
关联的名称中最常见的映射.同样,a2
将映射到 b2
.可以假设总会有赢家:无需打破平局.
My goal is to produce a mapping from a1
to b2
. Indeed, the names associated to a1
are n1
, n2
and n3
, which map respectively to b1
, b2
and b2
, so b2
is the most frequent mapping in the names associated to a1
. In the same way, a2
will be mapped to b2
. It's OK to assume that there will always be a winner: no need to break ties.
我希望我可以在我的数据帧上使用 groupBy(df.id_sa)
,但我不知道接下来要做什么.我希望最终可以生成以下行的聚合:
I was hoping that I could use groupBy(df.id_sa)
on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
但也许我试图使用错误的工具,我应该回去使用 RDD.
But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.
推荐答案
使用join
(如果出现平局,将导致多于一行):
Using join
(it will result in more than one row in group in case of ties):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
使用窗口函数(将删除关系):
Using window functions (will drop ties):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
使用 struct
排序:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
这篇关于在 Spark DataFrame 中查找每组的最大行数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!