选择从星火GroupedData对象随机物品 [英] Choosing random items from a Spark GroupedData Object
问题描述
我是新来的Python中使用Spark和一直未能解决这个问题:运行后 GROUPBY
在 pyspark.sql.dataframe .DataFrame
I'm new to using Spark in Python and have been unable to solve this problem: After running groupBy
on a pyspark.sql.dataframe.DataFrame
df = sqlsc.read.json("data.json")
df.groupBy('teamId')
您如何可以选择 N
每个结果组(由teamId分组)随机抽样不更换?
how can you choose N
random samples from each resulting group (grouped by teamId) without replacement?
基本上,我试图从各队选择 N
随机的用户,也许使用 GROUPBY
是错误的开始用?
I'm basically trying to choose N
random users from each team, maybe using groupBy
is wrong to start with?
推荐答案
嗯,这是一种错误。 GroupedData code>是不是真的设计用于数据访问。它只是描述了分组标准,并提供汇总的方法。见我的回答在星火使用GROUPBY并取回一个数据帧了解更多详情。
Well, it is kind of wrong. GroupedData
is not really designed for a data access. It just describes grouping criteria and provides aggregation methods. See my answer to Using groupBy in Spark and getting back to a DataFrame for more details.
这一理念的另一个问题是选择 N个随机样本
。这是真的很难并行实现无数据的心理分组任务,这是不是发生的事情,当你呼叫
GROUPBY在数据框
:
Another problem with this idea is selecting N random samples
. It is a task which is really hard to achieve in parallel without psychical grouping of data and it is not something that happens when you call
groupBy on a DataFrame
:
有至少两种方式来处理这个问题:
There are at least two ways to handle this:
-
转换为RDD,
GROUPBY
和执行本地采样
import random
n = 3
def sample(iter, n):
rs = random.Random() # We should probably use os.urandom as a seed
return rs.sample(list(iter), n)
df = sqlContext.createDataFrame(
[(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"],
("teamId", "x1", "x2"))
grouped = df.map(lambda row: (row.teamId, row)).groupByKey()
sampled = sqlContext.createDataFrame(
grouped.flatMap(lambda kv: sample(kv[1], n)))
sampled.show()
## +------+---+-------------------+
## |teamId| x1| x2|
## +------+---+-------------------+
## | 1| g| 0.81921738561455|
## | 1| f| 0.8563875814036598|
## | 1| a| 0.9010425238735935|
## | 2| c| 0.3864428179837973|
## | 2| g|0.06233470405822805|
## | 2| d|0.37620872770129155|
## | 3| f| 0.7518901502732027|
## | 3| e| 0.5142305439671874|
## | 3| d| 0.6250620479303716|
## +------+---+-------------------+
使用窗口功能
use window functions
from pyspark.sql import Window
from pyspark.sql.functions import col, rand, rowNumber
w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))
sampled = (df
.withColumn("rnd_", rand()) # Add random numbers column
.withColumn("rn_", rowNumber().over(w)) # Add rowNumber over windw
.where(col("rn_") <= n) # Take n observations
.drop("rn_") # drop helper columns
.drop("rnd_"))
sampled.show()
## +------+---+--------------------+
## |teamId| x1| x2|
## +------+---+--------------------+
## | 1| f| 0.8563875814036598|
## | 1| g| 0.81921738561455|
## | 1| i| 0.8173912535268248|
## | 2| h| 0.10862995810038856|
## | 2| c| 0.3864428179837973|
## | 2| a| 0.6695356657072442|
## | 3| b|0.012329360826023095|
## | 3| a| 0.6450777858109182|
## | 3| e| 0.5142305439671874|
## +------+---+--------------------+
但恐怕都将是相当昂贵的。如果各组的大小是平衡的,比较大的我会简单地使用 DataFrame.randomSplit
。
but I am afraid both will be rather expensive. If size of the individual groups is balanced and relatively large I would simply use DataFrame.randomSplit
.
如果基团的数量相对较少,所以可以尝试其他
If number of groups is relatively small it is possible to try something else:
from pyspark.sql.functions import count, udf
from pyspark.sql.types import BooleanType
from operator import truediv
counts = (df
.groupBy(col("teamId"))
.agg(count("*").alias("n"))
.map(lambda r: (r.teamId, r.n))
.collectAsMap())
# This defines fraction of observations from a group which should
# be taken to get n values
counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})
to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())
sampled = (df
.withColumn("rnd_", rand())
.where(to_take(col("teamId"), col("rnd_")))
.drop("rnd_"))
sampled.show()
## +------+---+--------------------+
## |teamId| x1| x2|
## +------+---+--------------------+
## | 1| d| 0.14815204548854788|
## | 1| f| 0.8563875814036598|
## | 1| g| 0.81921738561455|
## | 2| a| 0.6695356657072442|
## | 2| d| 0.37620872770129155|
## | 2| g| 0.06233470405822805|
## | 3| b|0.012329360826023095|
## | 3| h| 0.9022527556458557|
## +------+---+--------------------+
这不会给你的观察确切数字,但应该是不够好大部分时间只要一个号码,每组观测足够大,可以得到适当的样本。您还可以使用 sampleByKey
在RDD以类似的方式。
It won't give you exact number of observations but should be good enough most of the time as long as a number of observations per group is large enough to get proper samples. You can also use sampleByKey
on a RDD in a similar way.
这篇关于选择从星火GroupedData对象随机物品的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!