从 Spark GroupedData 对象中选择随机项目 [英] Choosing random items from a Spark GroupedData Object
问题描述
我是在 Python 中使用 Spark 的新手,一直无法解决这个问题:在 pyspark.sql.dataframe.DataFrame
上运行 groupBy
之后>
I'm new to using Spark in Python and have been unable to solve this problem: After running groupBy
on a pyspark.sql.dataframe.DataFrame
df = sqlsc.read.json("data.json")
df.groupBy('teamId')
如何在不替换的情况下从每个结果组(按 teamId 分组)中选择 N
个随机样本?
how can you choose N
random samples from each resulting group (grouped by teamId) without replacement?
我基本上是在尝试从每个团队中随机选择 N
个用户,也许从使用 groupBy
开始是错误的?
I'm basically trying to choose N
random users from each team, maybe using groupBy
is wrong to start with?
推荐答案
嗯,这有点不对.GroupedData
并不是真正为数据访问而设计的.它只是描述了分组标准并提供了聚合方法.有关详细信息,请参阅我对在 Spark 中使用 groupBy 并返回到 DataFrame 的回答.
Well, it is kind of wrong. GroupedData
is not really designed for a data access. It just describes grouping criteria and provides aggregation methods. See my answer to Using groupBy in Spark and getting back to a DataFrame for more details.
这个想法的另一个问题是选择N 个随机样本
.这是一项在没有心理分组数据的情况下很难并行实现的任务,并且当您在 DataFrame
上调用
groupBy 时不会发生这种情况:
Another problem with this idea is selecting N random samples
. It is a task which is really hard to achieve in parallel without psychical grouping of data and it is not something that happens when you call
groupBy on a DataFrame
:
至少有两种方法可以解决这个问题:
There are at least two ways to handle this:
转换为RDD,
groupBy
并进行局部采样
import random
n = 3
def sample(iter, n):
rs = random.Random() # We should probably use os.urandom as a seed
return rs.sample(list(iter), n)
df = sqlContext.createDataFrame(
[(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"],
("teamId", "x1", "x2"))
grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()
sampled = sqlContext.createDataFrame(
grouped.flatMap(lambda kv: sample(kv[1], n)))
sampled.show()
## +------+---+-------------------+
## |teamId| x1| x2|
## +------+---+-------------------+
## | 1| g| 0.81921738561455|
## | 1| f| 0.8563875814036598|
## | 1| a| 0.9010425238735935|
## | 2| c| 0.3864428179837973|
## | 2| g|0.06233470405822805|
## | 2| d|0.37620872770129155|
## | 3| f| 0.7518901502732027|
## | 3| e| 0.5142305439671874|
## | 3| d| 0.6250620479303716|
## +------+---+-------------------+
使用窗口函数
use window functions
from pyspark.sql import Window
from pyspark.sql.functions import col, rand, rowNumber
w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))
sampled = (df
.withColumn("rnd_", rand()) # Add random numbers column
.withColumn("rn_", rowNumber().over(w)) # Add rowNumber over windw
.where(col("rn_") <= n) # Take n observations
.drop("rn_") # drop helper columns
.drop("rnd_"))
sampled.show()
## +------+---+--------------------+
## |teamId| x1| x2|
## +------+---+--------------------+
## | 1| f| 0.8563875814036598|
## | 1| g| 0.81921738561455|
## | 1| i| 0.8173912535268248|
## | 2| h| 0.10862995810038856|
## | 2| c| 0.3864428179837973|
## | 2| a| 0.6695356657072442|
## | 3| b|0.012329360826023095|
## | 3| a| 0.6450777858109182|
## | 3| e| 0.5142305439671874|
## +------+---+--------------------+
但恐怕两者都会相当昂贵.如果各个组的大小平衡且相对较大,我将简单地使用 DataFrame.randomSplit
.
but I am afraid both will be rather expensive. If size of the individual groups is balanced and relatively large I would simply use DataFrame.randomSplit
.
如果组的数量相对较少,可以尝试其他方法:
If number of groups is relatively small it is possible to try something else:
from pyspark.sql.functions import count, udf
from pyspark.sql.types import BooleanType
from operator import truediv
counts = (df
.groupBy(col("teamId"))
.agg(count("*").alias("n"))
.rdd.map(lambda r: (r.teamId, r.n))
.collectAsMap())
# This defines fraction of observations from a group which should
# be taken to get n values
counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})
to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())
sampled = (df
.withColumn("rnd_", rand())
.where(to_take(col("teamId"), col("rnd_")))
.drop("rnd_"))
sampled.show()
## +------+---+--------------------+
## |teamId| x1| x2|
## +------+---+--------------------+
## | 1| d| 0.14815204548854788|
## | 1| f| 0.8563875814036598|
## | 1| g| 0.81921738561455|
## | 2| a| 0.6695356657072442|
## | 2| d| 0.37620872770129155|
## | 2| g| 0.06233470405822805|
## | 3| b|0.012329360826023095|
## | 3| h| 0.9022527556458557|
## +------+---+--------------------+
在 Spark 1.5+ 中,您可以将 udf
替换为对 sampleBy
方法的调用:
In Spark 1.5+ you can replace udf
with a call to sampleBy
method:
df.sampleBy("teamId", counts_bd.value)
它不会为您提供确切的观察数量,但只要每组的观察数量足够大以获取适当的样本,在大多数情况下应该足够好.您也可以以类似的方式在 RDD 上使用 sampleByKey
.
It won't give you exact number of observations but should be good enough most of the time as long as a number of observations per group is large enough to get proper samples. You can also use sampleByKey
on a RDD in a similar way.
这篇关于从 Spark GroupedData 对象中选择随机项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!