查找星火数据帧每组最大行 [英] Find maximum row per group in Spark DataFrame

查看:108
本文介绍了查找星火数据帧每组最大行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想,因为他们似乎更层次高比RDDS并往往会产生更可读code使用星火dataframes代替RDDS,但我会更乐意来获得更多的东西惯用的建议手头的任务。

I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code, but I would be more than happy to get suggestions for something more idiomatic for the task at hand.

在一个14节点谷歌Dataproc集群,我有一个由两个不同的系统转换为IDS约6百万名: SA SB 。每个包含名称 id_sa id_sb 。我的目标是生产从 id_sa 映射到 id_sb 使得对于每 id_sa ,相应的 id_sb 是附加到 id_sa 所有名称中最常见的ID。

In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa and sb. Each Row contains name, id_sa and id_sb. My goal is to produce a mapping from id_sa to id_sb such that for each id_sa, the corresponding id_sb is the most frequent id among all names attached to id_sa.

让我们尝试用一个例子来阐明。如果我有以下行:

Let's try to clarify with an example. If I have the following rows:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

我的目标是生产从 A1 的映射到 B2 。事实上,关联到 A1 的名称是 N1 N2 N3 ,分别映射到 B1 B2 B2 ,所以 B2 是关联到 A1 。以同样的方式, A2 将映射到 B2 。这是确定的假设,总是会有赢家:没有必要打破平局

My goal is to produce a mapping from a1 to b2. Indeed, the names associated to a1 are n1, n2 and n3, which map respectively to b1, b2 and b2, so b2 is the most frequent mapping in the names associated to a1. In the same way, a2 will be mapped to b2. It's OK to assume that there will always be a winner: no need to break ties.

我希望我可以用 GROUPBY(df.id_sa)我的数据帧,但我不知道下一步该怎么做。我希望为可能产生的聚集,到了最后,下面的行:

I was hoping that I could use groupBy(df.id_sa) on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

但是,也许我试图使用错误的工具,我应该回去使用RDDS。

But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.

推荐答案

使用加入(这将导致组多个行中关系的情况下):

Using join (it will result in more than one row in group in case of ties):

from pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

使用窗口功能(将下降领带):

Using window functions (will drop ties):

from pyspark.sql.functions import rowNumber
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", rowNumber().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

使用结构排序:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

又见 SPARK数据框:选择第一行中的每个组

这篇关于查找星火数据帧每组最大行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆