在 Spark 中嵌套并行化?正确的做法是什么? [英] Nesting parallelizations in Spark? What's the right approach?

查看:31
本文介绍了在 Spark 中嵌套并行化?正确的做法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

嵌套并行化?

假设我正在尝试在 Spark 中执行相当于嵌套 for 循环"的操作.就像在常规语言中一样,假设我在内部循环中有一个例程,它以 Pi 的方式估计 Pi平均 Spark 示例确实如此(请参阅估算 Pi)

Let's say I am trying to do the equivalent of "nested for loops" in Spark. Something like in a regular language, let's say I have a routine in the inside loop that estimates Pi the way the Pi Average Spark example does (see Estimating Pi)

i = 1000; j = 10^6; counter = 0.0;

for ( int i =0; i < iLimit; i++)
    for ( int j=0; j < jLimit ; j++)
        counter += PiEstimator();

estimateOfAllAverages = counter / i;

我可以在 Spark 中嵌套并行化调用吗?我正在尝试并且还没有解决问题.很乐意发布错误和代码,但我想我在问一个更具概念性的问题,关于这是否是 Spark 中的正确方法.

Can I nest parallelize calls in Spark? I am trying and have not worked out the kinks yet. Would be happy to post errors and code but I think I am asking a more conceptual question about whether this is the right approach in Spark.

我已经可以并行化单个 Spark Example/Pi Estimate,现在我想这样做 1000 次,看看它是否收敛于 Pi.(这与我们正在尝试解决的一个更大的问题有关,如果需要更接近 MVCE 的东西,我很乐意添加)

I can already parallelize a single Spark Example / Pi Estimate, now I want to do that 1000 times to see if it converges on Pi. (This relates to a larger problem we are trying to solve, if something closer to MVCE is needed I'd be happy to add )

底线问题我只需要有人直接回答:这是使用嵌套并行化调用的正确方法吗?如果不是请指教一些具体的东西,谢谢!这是我认为正确方法的伪代码方法:

BOTTOM LINE QUESTION I just need someone to answer directly: Is this the right approach, to use nested parallelize calls? If not please advise something specific, thanks! Here's a pseudo-code approach of what I think will be the right approach:

// use accumulator to keep track of each Pi Estimate result

sparkContext.parallelize(arrayOf1000, slices).map{ Function call

     sparkContext.parallelize(arrayOf10^6, slices).map{
            // do the 10^6 thing here and update accumulator with each result
    }
}

// take average of accumulator to see if all 1000 Pi estimates converge on Pi

背景:我问过这个问题并得到了一个一般性的答案,但没有找到解决方案,经过一番胡思乱想后,我决定发布一个具有不同特征的新问题.我也 试图在 Spark 用户邮件列表中询问这个问题 但那里也没有骰子.在此先感谢您的帮助.

BACKGROUND: I had asked this question and got a general answer but it did not lead to a solution, after some waffling I decided to post a new question with a different characterization. I also tried to ask this on the Spark User maillist but no dice there either. Thanks in advance for any help.

推荐答案

在 Pi 示例中,在嵌套的 for 循环中,您可以通过对过程 i * j 次执行单个循环并对所有的求和得到相同的答案然后在最后除以 j.如果您有要在外循环中应用的步骤,请在循环内执行这些步骤,但通过为每个内循环组分配特定键来创建不同的组.不知道你想在外循环中做什么样的事情,这里很难举个例子.

In the Pi example, in the nested for loop you can get the same answer by doing a single loop through the process i * j times and summing over all of them and then dividing by j at the end. If you have steps that you want to apply in the outer loop, do them within the loop, but create different groups by assigning specific keys to each inner-loop group. Without knowing what kinds of things you want to do in the outer loop its hard to give an example here.

对于只是平均以提高收敛性的简单情况,它相对容易.不用做嵌套循环,只需用 i * j 元素创建一个 rdd,然后将函数应用于每个元素.

For the simple case of just averaging to improve convergence, its relatively easy. Instead of doing the nested loop, just make an rdd with i * j elements and then apply the function to each element.

这可能看起来像(使用 pySpark ):( f 是你想应用的任何函数,记住它会传递 RDD 中的每个元素,所以即使你没有在你的函数中使用它,也要用输入定义你的 f)

this might look like (with pySpark ): ( f is whatever function you want to apply, remember that it will pass each element in the RDD so define your f with an input even if you don't use it in your function)

x = RandomRDDs.uniformRDD(sc, i*j)
function_values = x.map(f)

from operator import add   
sum_of_values = function_values.reduce(add)
averaged_value = sum_of_values/j (if you are only averaging over the outer loop)

如果您想在外循环中执行操作,我会分配一个索引 (zipWIthIndex),然后使用索引模 j 创建一个键.那么每个不同的键将是一个单一的虚拟内循环循环,您可以使用诸如aggregateByKey、foldByKey 或reduceByKey 之类的运算符来仅对这些记录执行操作.如果将不同的键分布到不同的分区,这可能会对性能造成一些影响.

If you want perform actions in the outer loop, I'd assign an index (zipWIthIndex) then create a key using the index modulo j. Then each different key would be a single virtual inner loop cycle and you can use operators like aggregateByKey, foldByKey, or reduceByKey to perform actions only on those records. This will probably take a bit of a performance hit if the different keys are distributed to different partitions.

另一种方法是将 rdd 重新分区到 j 个分区,然后使用 foreachPartition 函数将函数应用于每个分区.

An alternative would be to repartition the rdd onto j partitions and then use a foreachPartition function to apply a function to each partition.

第三种选择是并行运行内循环 j 次,将结果连接到一个分布式文件中,然后在将其读入 Spark 后执行外循环操作.

A third option would be to run the inner loop j times in parallel, concatenate the results into one distributed file, and then do the outer loop operations after reading this into Spark.

这篇关于在 Spark 中嵌套并行化?正确的做法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆