针对JavaRDD的每个操作的Apache Spark计时 [英] Apache Spark timing forEach operation on JavaRDD

查看:151
本文介绍了针对JavaRDD的每个操作的Apache Spark计时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题:这是测试构建RDD所需时间的有效方法吗?

我在这里做两件事。基本方法是,我们有M个实例(称为DropEvaluation)和N个DropResults。我们需要将每个N DropResult与每个M DropEvaluations进行比较。每个M必须看到每个N,最后才能为我们提供M个结果。

I am doing two things here. The basic approach is that we have M instances of what we call a DropEvaluation, and N DropResults. We need to compare each N DropResult to each of M DropEvaluations. Each N must be seen by each M, to give us M results in the end.

如果在构建RDD之后我不使用.count(),驱动程序继续执行下一行代码,并说构建一个需要30分钟的RDD似乎几乎没有时间。

If I don't use the .count() once the RDD is built, the driver continues on to the next line of code and says it look almost no time to build a RDD that takes 30 minutes to build.

我只想确保我没有丢失任何东西,例如.count()可能需要很长时间?我想计时.count(),我不得不修改Spark的源代码吗?

M = 1000或2000。N = 10 ^ 7。

M = 1000 or 2000. N = 10^7.

实际上是笛卡尔问题-选择累加器是因为我们需要在适当的位置写入每个M。构建完整的笛卡尔RDD也很丑陋。

It's effectively a cartesian problem -- the accumulator was chosen because we need to write to each M in place. I would also be ugly to build the full cartesian RDD.

我们建立了一个M累加器列表(不能用Java做一个列表累加器吗?)。

We build a List of M Accumulators (can't do a List Accumulator in Java right?). Then we loop through each of N in a RDD with a foreach.

澄清问题:总花费的时间是正确测量的,我问是否。 RDD上的count()强制Spark等待RDD完成后才能运行计数。 .count()时间重要吗?

这是我们的代码:

// assume standin exists and does it's thing correctly

// this controls the final size of RDD, as we are not parallelizing something with an existing length
List<Integer> rangeN = IntStream.rangeClosed(simsLeft - blockSize + 1, simsLeft).boxed().collect(Collectors.toList());

// setup bogus array of size N for parallelize dataSetN to lead to dropResultsN       
JavaRDD<Integer> dataSetN = context.parallelize(rangeN);

// setup timing to create N
long NCreationStartTime = System.nanoTime();

// this maps each integer element of RDD dataSetN to a "geneDropped" chromosome simulation, we need N of these:
JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY());

// **** this line makes the driver wait until the RDD is done, right?
long dummyLength = dropResultsN.count();


long NCreationNanoSeconds = System.nanoTime() - NCreationStartTime;
double NCreationSeconds = (double)NCreationNanoSeconds / 1000000000.0;
double NCreationMinutes = NCreationSeconds / 60.0;

logger.error("{} test sims remaining", simsLeft);

// now get the time for just the dropComparison (part of accumulable's add)
long startDropCompareTime = System.nanoTime();

// here we iterate through each accumulator in the list and compare all N elements of dropResultsN RDD to each M in turn, our .add() is a custom AccumulableParam
for (Accumulable<TholdDropTuple, TholdDropResult> dropEvalAccum : accumList) {
    dropResultsN.foreach(new VoidFunction<TholdDropResult>() {
                    @Override
                    public void call(TholdDropResult dropResultFromN) throws Exception {
                            dropEvalAccum.add(dropResultFromN);
                    }
                });
            }

    // all the dropComparisons for all N to all M for this blocksize are done, check the time...
   long dropCompareNanoSeconds = System.nanoTime() - startDropCompareTime;
   double dropCompareSeconds = (double)dropCompareNanoSeconds / 1000000000.0;
    double dropCompareMinutes = dropCompareSeconds / 60.0;

    // write lines to indicate timing section
    // log and write to file the time for the N-creation

    ...

} // end for that goes through dropAccumList


推荐答案

Spark程序是惰性的,只有在RDD上调用所有操作,例如 count 之类的程序后,它才会运行。您可以在 Spark的文档中找到常见操作的列表

Spark program is lazy, it will not run until you call all action like count on the RDD. You can find a list of common action in Spark's document

// **** this line makes the driver wait until the RDD is done, right?
long dummyLength = dropResultsN.count();

是的,在这种情况下 count 强制 dropResultsN 要计算,因此需要很长时间。如果再进行一次 计数,由于已经计算并缓存了RDD,因此返回很快。

Yes, in this case count force the dropResultsN to be computed, so it'll take a long time. If you do a second count, it'll return very fast since the RDD is already computed and cached.

这篇关于针对JavaRDD的每个操作的Apache Spark计时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆