查找星火数据帧每组最大行 [英] Find maximum row per group in Spark DataFrame
问题描述
我想,因为他们似乎更层次高比RDDS并往往会产生更可读code使用星火dataframes代替RDDS,但我会更乐意来获得更多的东西惯用的建议手头的任务。
I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code, but I would be more than happy to get suggestions for something more idiomatic for the task at hand.
在一个14节点谷歌Dataproc集群,我有一个由两个不同的系统转换为IDS约6百万名: SA
和 SB
。每个行
包含名称
, id_sa
和 id_sb
。我的目标是生产从 id_sa
映射到 id_sb
使得对于每 id_sa
,相应的 id_sb
是附加到 id_sa
所有名称中最常见的ID。
In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa
and sb
. Each Row
contains name
, id_sa
and id_sb
. My goal is to produce a mapping from id_sa
to id_sb
such that for each id_sa
, the corresponding id_sb
is the most frequent id among all names attached to id_sa
.
让我们尝试用一个例子来阐明。如果我有以下行:
Let's try to clarify with an example. If I have the following rows:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
我的目标是生产从 A1
的映射到 B2
。事实上,关联到 A1
的名称是 N1
, N2
和 N3
,分别映射到 B1
, B2
和 B2
,所以 B2
是关联到 A1 $的名称最常见的映射C $ C>。以同样的方式,
A2
将映射到 B2
。这是确定的假设,总是会有赢家:没有必要打破平局
My goal is to produce a mapping from a1
to b2
. Indeed, the names associated to a1
are n1
, n2
and n3
, which map respectively to b1
, b2
and b2
, so b2
is the most frequent mapping in the names associated to a1
. In the same way, a2
will be mapped to b2
. It's OK to assume that there will always be a winner: no need to break ties.
我希望我可以用 GROUPBY(df.id_sa)
我的数据帧,但我不知道下一步该怎么做。我希望为可能产生的聚集,到了最后,下面的行:
I was hoping that I could use groupBy(df.id_sa)
on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
但是,也许我试图使用错误的工具,我应该回去使用RDDS。
But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.
推荐答案
使用加入
(这将导致组多个行中关系的情况下):
Using join
(it will result in more than one row in group in case of ties):
from pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
使用窗口功能(将下降领带):
Using window functions (will drop ties):
from pyspark.sql.functions import rowNumber
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", rowNumber().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
使用结构
排序:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
这篇关于查找星火数据帧每组最大行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!