在 Spark DataFrame 中查找每组的最大行数 [英] Find maximum row per group in Spark DataFrame

查看:56
本文介绍了在 Spark DataFrame 中查找每组的最大行数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 数据帧而不是 RDD,因为它们似乎比 RDD 更高级,并且倾向于生成更易读的代码.

I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code.

在一个 14 节点的 Google Dataproc 集群中,我有大约 600 万个名称被两个不同的系统转换为 ID:sasb.每个Row 包含nameid_said_sb.我的目标是生成从 id_said_sb 的映射,这样对于每个 id_sa,对应的 id_sb 是附加到 id_sa 的所有名称中最常见的 id.

In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa and sb. Each Row contains name, id_sa and id_sb. My goal is to produce a mapping from id_sa to id_sb such that for each id_sa, the corresponding id_sb is the most frequent id among all names attached to id_sa.

让我们试着用一个例子来澄清.如果我有以下几行:

Let's try to clarify with an example. If I have the following rows:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

我的目标是生成从 a1b2 的映射.实际上,与 a1 相关联的名称是 n1n2n3,它们分别映射到 b1b2b2,所以 b2 是与 a1 关联的名称中最常见的映射.同样,a2 将映射到 b2.可以假设总会有赢家:无需打破平局.

My goal is to produce a mapping from a1 to b2. Indeed, the names associated to a1 are n1, n2 and n3, which map respectively to b1, b2 and b2, so b2 is the most frequent mapping in the names associated to a1. In the same way, a2 will be mapped to b2. It's OK to assume that there will always be a winner: no need to break ties.

我希望我可以在我的数据帧上使用 groupBy(df.id_sa),但我不知道接下来要做什么.我希望最终可以生成以下行的聚合:

I was hoping that I could use groupBy(df.id_sa) on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

但也许我试图使用错误的工具,我应该回去使用 RDD.

But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.

推荐答案

使用join(如果出现平局,将导致多于一行):

Using join (it will result in more than one row in group in case of ties):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

使用窗口函数(将删除关系):

Using window functions (will drop ties):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

使用 struct 排序:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

另见如何选择每组的第一行?

这篇关于在 Spark DataFrame 中查找每组的最大行数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆