Spark中的分层抽样 [英] Stratified sampling in Spark

查看:58
本文介绍了Spark中的分层抽样的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有包含用户和购买数据的数据集.这是一个示例,其中第一个元素是 userId,第二个元素是 productId,第三个元素表示布尔值.

I have data set which contains user and purchase data. Here is an example, where first element is userId, second is productId, and third indicate boolean.

(2147481832,23355149,1)
(2147481832,973010692,1)
(2147481832,2134870842,1)
(2147481832,541023347,1)
(2147481832,1682206630,1)
(2147481832,1138211459,1)
(2147481832,852202566,1)
(2147481832,201375938,1)
(2147481832,486538879,1)
(2147481832,919187908,1)
... 

我想确保我只获取每个用户数据的 80% 并构建一个 RDD,同时获取其余的 20% 并构建另一个 RDD.让我们调用训练和测试.我想远离使用 groupBy 开始,因为它会产生内存问题,因为数据集很大.最好的方法是什么?

I want to make sure I only take 80% of each users data and build an RDD while take the rest of the 20% and build a another RDD. Lets call train and test. I would like to stay away from using groupBy to start with since it can create memory problem since data set is large. Whats the best way to do this?

我可以执行以下操作,但这不会给每个用户 80%.

I could do following but this will not give 80% of each user.

val percentData = data.map(x => ((math.random * 100).toInt, x._1. x._2, x._3)
val train = percentData.filter(x => x._1 < 80).values.repartition(10).cache()

推荐答案

Holden 的回答中提供了一种可能的解决方案,以下是其他一些解决方案:

One possible solution is in Holden's answer, and here is some other solutions :

使用 RDD:

您可以使用来自 PairRDDFunctions 类.

You can use the sampleByKeyExact transformation, from the PairRDDFunctions class.

sampleByKeyExact(布尔值 withReplacement,scala.collection.Map 分数,长种子)返回按键(通过分层采样)采样的此 RDD 的一个子集,其中包含每个层(具有相同键的一组对)的 math.ceil(numItems * samplingRate).

sampleByKeyExact(boolean withReplacement, scala.collection.Map fractions, long seed) Return a subset of this RDD sampled by key (via stratified sampling) containing exactly math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).

这就是我要做的:

考虑以下列表:

val seq = Seq(
                (2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),
                (2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),
                (2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)
           )

我会创建一个 RDD Pair,将所有用户映射为键:

I would create an RDD Pair, mapping all the users as keys :

val data = sc.parallelize(seq).map(x => (x._1,(x._2,x._3)))

然后我将为每个键设置 fractions 如下,因为 sampleByKeyExact 为每个键获取分数映射:

Then I'll set up fractions for each key as following, since sampleByKeyExact takes a Map of fraction for each key :

val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap

我在这里所做的是映射键以找到不同的键,然后将每个键关联到一个等于 0.8 的分数.我收集了整个地图.

What I have done here is mapping on the keys to find distinct keys and then associate each to a fraction equals to 0.8. I collect the whole as a Map.

现在采样:

import org.apache.spark.rdd.PairRDDFunctions
val sampleData = data.sampleByKeyExact(false, fractions, 2L)

val sampleData = data.sampleByKeyExact(withReplacement = false, fractions = fractions,seed = 2L)

您可以检查密钥或数据或数据样本的计数:

You can check the count on your keys or data or data sample :

scala > data.count
// [...]
// res10: Long = 12

scala > sampleData.count
// [...]
// res11: Long = 10

使用数据帧:

让我们考虑上一节中的相同数据 (seq).

Let's consider the same data (seq) from the previous section.

val df = seq.toDF("keyColumn","value1","value2")
df.show
// +----------+----------+------+
// | keyColumn|    value1|value2|
// +----------+----------+------+
// |2147481832|  23355149|     1|
// |2147481832| 973010692|     1|
// |2147481832|2134870842|     1|
// |2147481832| 541023347|     1|
// |2147481832|1682206630|     1|
// |2147481832|1138211459|     1|
// |2147481832| 852202566|     1|
// |2147481832| 201375938|     1|
// |2147481832| 486538879|     1|
// |2147481832| 919187908|     1|
// | 214748183| 919187908|     1|
// | 214748183|  91187908|     1|
// +----------+----------+------+

我们需要底层的 RDD 来完成,通过将我们的键定义为第一列来创建此 RDD 中元素的元组:

We will need the underlying RDD to do that on which we creates tuples of the elements in this RDD by defining our key to be the first column :

val data: RDD[(Int, Row)] = df.rdd.keyBy(_.getInt(0))
val fractions: Map[Int, Double] = data.map(_._1)
                                      .distinct
                                      .map(x => (x, 0.8))
                                      .collectAsMap

val sampleData: RDD[Row] = data.sampleByKeyExact(withReplacement = false, fractions, 2L)
                               .values

val sampleDataDF: DataFrame = spark.createDataFrame(sampleData, df.schema) // you can use sqlContext.createDataFrame(...) instead for spark 1.6)

您现在可以检查您的密钥或 df 或数据样本的计数:

You can now check the count on your keys or df or data sample :

scala > df.count
// [...]
// res9: Long = 12

scala > sampleDataDF.count
// [...]
// res10: Long = 10

Spark 1.5.0 起,您可以使用 DataFrameStatFunctions.sampleBy 方法:

Since Spark 1.5.0 you can use DataFrameStatFunctions.sampleBy method:

df.stat.sampleBy("keyColumn", fractions, seed)

这篇关于Spark中的分层抽样的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆