选择从星火GroupedData对象随机物品 [英] Choosing random items from a Spark GroupedData Object

查看:322
本文介绍了选择从星火GroupedData对象随机物品的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来的Python中使用Spark和一直未能解决这个问题:运行后 GROUPBY pyspark.sql.dataframe .DataFrame

I'm new to using Spark in Python and have been unable to solve this problem: After running groupBy on a pyspark.sql.dataframe.DataFrame

df = sqlsc.read.json("data.json")
df.groupBy('teamId')

您如何可以选择 N 每个结果组(由teamId分组)随机抽样不更换?

how can you choose N random samples from each resulting group (grouped by teamId) without replacement?

基本上,我试图从各队选择 N 随机的用户,也许使用 GROUPBY 是错误的开始用?

I'm basically trying to choose N random users from each team, maybe using groupBy is wrong to start with?

推荐答案

嗯,这是一种错误。 GroupedData 是不是真的设计用于数据访问。它只是描述了分组标准,并提供​​汇总的方法。见我的回答在星火使用GROUPBY并取回一个数据帧了解更多详情。

Well, it is kind of wrong. GroupedData is not really designed for a data access. It just describes grouping criteria and provides aggregation methods. See my answer to Using groupBy in Spark and getting back to a DataFrame for more details.

这一理念的另一个问题是选择 N个随机样本。这是真的很难并行实现无数据的心理分组任务,这是不是发生的事情,当你呼叫 GROUPBY在数据框

Another problem with this idea is selecting N random samples. It is a task which is really hard to achieve in parallel without psychical grouping of data and it is not something that happens when you call groupBy on a DataFrame:

有至少两种方式来处理这个问题:

There are at least two ways to handle this:


  • 转换为RDD, GROUPBY 和执行本地采样

import random

n = 3

def sample(iter, n): 
    rs = random.Random()  # We should probably use os.urandom as a seed
    return rs.sample(list(iter), n)    

df = sqlContext.createDataFrame(
    [(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"], 
    ("teamId", "x1", "x2"))

grouped = df.map(lambda row: (row.teamId, row)).groupByKey()

sampled = sqlContext.createDataFrame(
    grouped.flatMap(lambda kv: sample(kv[1], n)))

sampled.show()

## +------+---+-------------------+
## |teamId| x1|                 x2|
## +------+---+-------------------+
## |     1|  g|   0.81921738561455|
## |     1|  f| 0.8563875814036598|
## |     1|  a| 0.9010425238735935|
## |     2|  c| 0.3864428179837973|
## |     2|  g|0.06233470405822805|
## |     2|  d|0.37620872770129155|
## |     3|  f| 0.7518901502732027|
## |     3|  e| 0.5142305439671874|
## |     3|  d| 0.6250620479303716|
## +------+---+-------------------+


  • 使用窗口功能

  • use window functions

    from pyspark.sql import Window
    from pyspark.sql.functions import col, rand, rowNumber
    
    w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))
    
    sampled = (df
        .withColumn("rnd_", rand())  # Add random numbers column
        .withColumn("rn_", rowNumber().over(w))  # Add rowNumber over windw
        .where(col("rn_") <= n)  # Take n observations
        .drop("rn_")  # drop helper columns
        .drop("rnd_"))
    
    sampled.show()
    
    ## +------+---+--------------------+
    ## |teamId| x1|                  x2|
    ## +------+---+--------------------+
    ## |     1|  f|  0.8563875814036598|
    ## |     1|  g|    0.81921738561455|
    ## |     1|  i|  0.8173912535268248|
    ## |     2|  h| 0.10862995810038856|
    ## |     2|  c|  0.3864428179837973|
    ## |     2|  a|  0.6695356657072442|
    ## |     3|  b|0.012329360826023095|
    ## |     3|  a|  0.6450777858109182|
    ## |     3|  e|  0.5142305439671874|
    ## +------+---+--------------------+
    


  • 但恐怕都将是相当昂贵的。如果各组的大小是平衡的,比较大的我会简单地使用 DataFrame.randomSplit

    but I am afraid both will be rather expensive. If size of the individual groups is balanced and relatively large I would simply use DataFrame.randomSplit.

    如果基团的数量相对较少,所以可以尝试其他

    If number of groups is relatively small it is possible to try something else:

    from pyspark.sql.functions import count, udf
    from pyspark.sql.types import BooleanType
    from operator import truediv
    
    counts = (df
        .groupBy(col("teamId"))
        .agg(count("*").alias("n"))
        .map(lambda r: (r.teamId, r.n))
        .collectAsMap()) 
    
    # This defines fraction of observations from a group which should
    # be taken to get n values 
    counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})
    
    to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())
    
    sampled = (df
        .withColumn("rnd_", rand())
        .where(to_take(col("teamId"), col("rnd_")))
        .drop("rnd_"))
    
    sampled.show()
    
    ## +------+---+--------------------+
    ## |teamId| x1|                  x2|
    ## +------+---+--------------------+
    ## |     1|  d| 0.14815204548854788|
    ## |     1|  f|  0.8563875814036598|
    ## |     1|  g|    0.81921738561455|
    ## |     2|  a|  0.6695356657072442|
    ## |     2|  d| 0.37620872770129155|
    ## |     2|  g| 0.06233470405822805|
    ## |     3|  b|0.012329360826023095|
    ## |     3|  h|  0.9022527556458557|
    ## +------+---+--------------------+
    

    这不会给你的观察确切数字,但应该是不够好大部分时间只要一个号码,每组观测足够大,可以得到适当的样本。您还可以使用 sampleByKey 在RDD以类似的方式。

    It won't give you exact number of observations but should be good enough most of the time as long as a number of observations per group is large enough to get proper samples. You can also use sampleByKey on a RDD in a similar way.

    这篇关于选择从星火GroupedData对象随机物品的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆