Spark 如何跟踪 randomSplit 中的拆分? [英] How does Spark keep track of the splits in randomSplit?

查看:32
本文介绍了Spark 如何跟踪 randomSplit 中的拆分?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题解释了 Spark 的随机拆分是如何工作的,Sparks RDD.randomSplit 如何实际拆分 RDD,但我不明白 spark 如何跟踪哪些值进入一个拆分,以便这些相同的值不会进入第二个拆分.

This question explains how Spark's random split works, How does Sparks RDD.randomSplit actually split the RDD, but I don't understand how spark keeps track of what values went to one split so that those same values don't go to the second split.

如果我们看一下 randomSplit 的实现:

If we look at the implementation of randomSplit:

def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] = {
 // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
 // constituent partitions each time a split is materialized which could result in
 // overlapping splits. To prevent this, we explicitly sort each input partition to make the
 // ordering deterministic.

 val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan)
 val sum = weights.sum
 val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
 normalizedCumWeights.sliding(2).map { x =>
  new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted))
}.toArray
}

我们可以看到它创建了两个共享相同 sqlContext 和两个不同 Sample(rs) 的 DataFrame.

we can see that it creates two DataFrames that share the same sqlContext and with two different Sample(rs).

这两个 DataFrame 如何相互通信,以便第一个中的值不包含在第二个中?

How are these two DataFrame(s) communicating with each other so that a value that fell in the first one is not included in the second one?

数据是否被提取了两次?(假设 sqlContext 是从数据库中进行选择,选择是否执行了两次?).

And is the data being fetched twice? (Assume the sqlContext is selecting from a DB, is the select being executed twice?).

推荐答案

这与对 RDD 进行采样完全相同.

It's exactly the same as sampling an RDD.

假设您有权重数组 (0.6, 0.2, 0.2),Spark 将为每个范围 (0.0, 0.6), (0.6, 0.8), (0.8,1.0).

Assuming you have the weight array (0.6, 0.2, 0.2), Spark will generate one DataFrame for each range (0.0, 0.6), (0.6, 0.8), (0.8, 1.0).

当需要读取结果 DataFrame 时,Spark 将遍历父 DataFrame.对于每个项目,生成一个随机数,如果该数字落在指定范围内,则发出该项目.所有子 DataFrame 共享同一个随机数生成器(从技术上讲,不同的生成器具有相同的种子),因此随机数的序列是确定性的.

When it's time to read the result DataFrame, Spark will just go over the parent DataFrame. For each item, generate a random number, if that number fall in the the specified range, then emit the item. All child DataFrame share the same random number generator (technically, different generators with the same seed), so the sequence of random number is deterministic.

对于您的最后一个问题,如果您没有缓存父 DataFrame,那么每次计算输出 DataFrame 时都会重新获取输入 DataFrame 的数据.

For your last question, if you did not cache the parent DataFrame, then the data for the input DataFrame will be re-fetch each time an output DataFrame is computed.

这篇关于Spark 如何跟踪 randomSplit 中的拆分?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆