从 Spark GroupedData 对象中选择随机项目 [英] Choosing random items from a Spark GroupedData Object

查看:25
本文介绍了从 Spark GroupedData 对象中选择随机项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是在 Python 中使用 Spark 的新手,一直无法解决这个问题:在 pyspark.sql.dataframe.DataFrame 上运行 groupBy 之后>

I'm new to using Spark in Python and have been unable to solve this problem: After running groupBy on a pyspark.sql.dataframe.DataFrame

df = sqlsc.read.json("data.json")
df.groupBy('teamId')

如何在不替换的情况下从每个结果组(按 teamId 分组)中选择 N 个随机样本?

how can you choose N random samples from each resulting group (grouped by teamId) without replacement?

我基本上是在尝试从每个团队中随机选择 N 个用户,也许从使用 groupBy 开始是错误的?

I'm basically trying to choose N random users from each team, maybe using groupBy is wrong to start with?

推荐答案

嗯,这有点不对.GroupedData 并不是真正为数据访问而设计的.它只是描述了分组标准并提供了聚合方法.有关详细信息,请参阅我对在 Spark 中使用 groupBy 并返回到 DataFrame 的回答.

Well, it is kind of wrong. GroupedData is not really designed for a data access. It just describes grouping criteria and provides aggregation methods. See my answer to Using groupBy in Spark and getting back to a DataFrame for more details.

这个想法的另一个问题是选择N 个随机样本.这是一项在没有心理分组数据的情况下很难并行实现的任务,并且当您在 DataFrame调用 groupBy 时不会发生这种情况:

Another problem with this idea is selecting N random samples. It is a task which is really hard to achieve in parallel without psychical grouping of data and it is not something that happens when you call groupBy on a DataFrame:

至少有两种方法可以解决这个问题:

There are at least two ways to handle this:

  • 转换为RDD,groupBy并进行局部采样

import random

n = 3

def sample(iter, n): 
    rs = random.Random()  # We should probably use os.urandom as a seed
    return rs.sample(list(iter), n)    

df = sqlContext.createDataFrame(
    [(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"], 
    ("teamId", "x1", "x2"))

grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()

sampled = sqlContext.createDataFrame(
    grouped.flatMap(lambda kv: sample(kv[1], n)))

sampled.show()

## +------+---+-------------------+
## |teamId| x1|                 x2|
## +------+---+-------------------+
## |     1|  g|   0.81921738561455|
## |     1|  f| 0.8563875814036598|
## |     1|  a| 0.9010425238735935|
## |     2|  c| 0.3864428179837973|
## |     2|  g|0.06233470405822805|
## |     2|  d|0.37620872770129155|
## |     3|  f| 0.7518901502732027|
## |     3|  e| 0.5142305439671874|
## |     3|  d| 0.6250620479303716|
## +------+---+-------------------+

  • 使用窗口函数

  • use window functions

    from pyspark.sql import Window
    from pyspark.sql.functions import col, rand, rowNumber
    
    w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))
    
    sampled = (df
        .withColumn("rnd_", rand())  # Add random numbers column
        .withColumn("rn_", rowNumber().over(w))  # Add rowNumber over windw
        .where(col("rn_") <= n)  # Take n observations
        .drop("rn_")  # drop helper columns
        .drop("rnd_"))
    
    sampled.show()
    
    ## +------+---+--------------------+
    ## |teamId| x1|                  x2|
    ## +------+---+--------------------+
    ## |     1|  f|  0.8563875814036598|
    ## |     1|  g|    0.81921738561455|
    ## |     1|  i|  0.8173912535268248|
    ## |     2|  h| 0.10862995810038856|
    ## |     2|  c|  0.3864428179837973|
    ## |     2|  a|  0.6695356657072442|
    ## |     3|  b|0.012329360826023095|
    ## |     3|  a|  0.6450777858109182|
    ## |     3|  e|  0.5142305439671874|
    ## +------+---+--------------------+
    

  • 但恐怕两者都会相当昂贵.如果各个组的大小平衡且相对较大,我将简单地使用 DataFrame.randomSplit.

    but I am afraid both will be rather expensive. If size of the individual groups is balanced and relatively large I would simply use DataFrame.randomSplit.

    如果组的数量相对较少,可以尝试其他方法:

    If number of groups is relatively small it is possible to try something else:

    from pyspark.sql.functions import count, udf
    from pyspark.sql.types import BooleanType
    from operator import truediv
    
    counts = (df
        .groupBy(col("teamId"))
        .agg(count("*").alias("n"))
        .rdd.map(lambda r: (r.teamId, r.n))
        .collectAsMap()) 
    
    # This defines fraction of observations from a group which should
    # be taken to get n values 
    counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})
    
    to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())
    
    sampled = (df
        .withColumn("rnd_", rand())
        .where(to_take(col("teamId"), col("rnd_")))
        .drop("rnd_"))
    
    sampled.show()
    
    ## +------+---+--------------------+
    ## |teamId| x1|                  x2|
    ## +------+---+--------------------+
    ## |     1|  d| 0.14815204548854788|
    ## |     1|  f|  0.8563875814036598|
    ## |     1|  g|    0.81921738561455|
    ## |     2|  a|  0.6695356657072442|
    ## |     2|  d| 0.37620872770129155|
    ## |     2|  g| 0.06233470405822805|
    ## |     3|  b|0.012329360826023095|
    ## |     3|  h|  0.9022527556458557|
    ## +------+---+--------------------+
    

    在 Spark 1.5+ 中,您可以将 udf 替换为对 sampleBy 方法的调用:

    In Spark 1.5+ you can replace udf with a call to sampleBy method:

    df.sampleBy("teamId", counts_bd.value)
    

    它不会为您提供确切的观察数量,但只要每组的观察数量足够大以获取适当的样本,在大多数情况下应该足够好.您也可以以类似的方式在 RDD 上使用 sampleByKey.

    It won't give you exact number of observations but should be good enough most of the time as long as a number of observations per group is large enough to get proper samples. You can also use sampleByKey on a RDD in a similar way.

    这篇关于从 Spark GroupedData 对象中选择随机项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆