在Apache中星火,可我很容易重复/巢SparkContext.parallelize? [英] In Apache Spark, can I easily repeat/nest a SparkContext.parallelize?

查看:202
本文介绍了在Apache中星火,可我很容易重复/巢SparkContext.parallelize?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图模型中,我们正试图解决一个问题遗传学中的步骤建立它。我可以成功运行星火例子的PiAverage例子。这个例子在(本例中10 ^ 6)一个圈投飞镖,并计算的数量土地在圈内估计PI

比方说,我想重复这一过程,1000次(并行)和平均所有这些估计。我想看到的最好的办法,好像有将是两个呼叫并行?嵌套调用?请问有没有办法链地图或减少调用起来呢?我无法看到它。

我想知道的东西像下面的想法的智慧。我想用一个累加器追踪所产生的估计。 JSC是我SparkContext,全$ C $单次运行的c是在问题的结束,感谢您的输入!

 累加器LT;双> ACCUM = jsc.accumulator(0.0);//做一个清单1000长通过并行(没有在星火循环,对吧?)
清单<整数GT; numberOfEstimates =新的ArrayList<整数GT;(HOW_MANY_ESTIMATES);//通过这个虚拟目录并行,然后
//调用pieceOfPI方法生产每一个人的估计
//积累的估计。 PieceOfPI将包含一个
//调用并行过,并在最后的code中的各个测试
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(JSC,numList,切片HOW_MANY_ESTIMATES)));//得到总PI估计的价值和打印他们的平均
双totalPi = accum.value();//输出平均值平均值
的System.out.println(+ HOW_MANY_ESTIMATES +平均,皮的估计是+ totalPi / HOW_MANY_ESTIMATES);

它似乎并不像一个矩阵或其他答案我看到这样就给回答这个具体问题,我做了几个搜索,但我没有看到如何做到这一点没有并行的并行化。那是一个坏主意?

(是的,我知道数学我可以再做更多的估计和有效地得到相同的结果:)试图建立我的老板想要的结构,非常感谢!

我已经把我的整个单测试程序在这里有没有什么帮助,SANS累加器我测试了。这样做的核心将成为PieceOfPI():

 的Bean;
进口的java.util.ArrayList;
进口的java.util.List;进口org.apache.spark.Accumulable;
进口org.apache.spark.Accumulator;
进口org.apache.spark.SparkContext;
进口org.apache.spark.api.java.JavaRDD;
进口org.apache.spark.api.java.JavaSparkContext;
进口org.apache.spark.api.java.function.Function;
进口org.apache.spark.api.java.function.Function2;
进口org.apache.spark.storage.StorageLevel;
进口org.apache.spark.SparkConf;
进口org.apache.spark.storage.StorageLevel;公共类PiAverage实现Serializable {公共静态无效的主要(字串[] args){    PiAverage PA =新PiAverage();
    pa.go();}公共无效围棋(){    //应该做一个类似的参数所有这些决赛应该是
    // INT切片=(args.length == 1)?的Integer.parseInt(参数[0]):2;
    最终诠释SLICES = 16;    //多少个飞镖,在圈内都扔得到一个单一丕估计
    最终诠释HOW_MANY_DARTS = 1000000;    //许多标靶如何收集平均郫县估计,我们希望在实际PI收敛
    最终诠释HOW_MANY_ESTIMATES = 1000;    SparkConf sparkConf =新SparkConf()。setAppName(PiAverage)
        .setMaster(本地[4]);    JavaSparkContext JSC =新JavaSparkContext(sparkConf);    //设置虚拟大小HOW_MANY_DARTS的ArrayList的 - 有多少飞镖扔
    清单<整数GT; throwsList =新的ArrayList<整数GT;(HOW_MANY_DARTS);
    的for(int i = 0; I< HOW_MANY_DARTS;我++){
        throwsList.add(ⅰ);
    }    //设置虚拟大小HOW_MANY_ESTIMATES的ArrayList中
    清单<整数GT; numberOfEstimates =新的ArrayList<整数GT;(HOW_MANY_ESTIMATES);
    的for(int i = 0; I< HOW_MANY_ESTIMATES;我++){
        numberOfEstimates.add(ⅰ);
    }    JavaRDD<整数GT;数据= jsc.parallelize(throwsList,洗净切片);    长totalPi = dataSet.filter(新功能<整数,布尔>(){
        公共布尔调用(整数i){
            双X =的Math.random();
            双Y =的Math.random();
            如果(X * X + Y * Y'。1){
                返回true;
            }其他
                返回false;
        }
    })。计数();    的System.out.println(
            + HOW_MANY_DARTS +平均,皮的估计是+ 4 * totalPi /(双)HOW_MANY_DARTS);    jsc.stop();
    jsc.close();
}
}


解决方案

让我先谈谈你的背景问题。转换操作像地图加入 GROUPBY 等。分为两类;那些需要数据的洗牌作为输入从所有的分区,以及那些不。操作像 GROUPBY 加入需要洗牌,因为你需要从所有RDD与分区汇集所有记录相同的密钥(想想SQL如何加入 GROUP BY OPS工作)。在另一方面,地图 flatMap 过滤器,等并不需要洗牌,因为操作工作于previous一步的分区的输入罚款。他们在一次针对单条记录工作,不匹配的密钥组他们。因此,没有改组是必要的。

此背景有必要了解,一个额外地图不具有显著开销。操作就像一个连续地图 flatMap 等被挤在一起成为一个舞台(这是当你在看细节星火Web控制台工作)显示的那样,只有一个RDD被物化,一个在舞台的结束。

在你的第一个问题。我不会用一个累加器这一点。它们主要用于边带的数据,例如,有多少坏行你解析计数。在这个例子中,你可以使用蓄电池来算(x,y)对有多少人1对外面的半径内,作为一个例子。

在Spark分布的 JavaPiSpark 的例子是因为它得到一样好。你应该研究为什么它的工作原理。这对于大数据系统的正确数据流模型。你可以使用聚合。在的Javadoc ,单击索引,并期待在 AGG aggregateByKey 功能。但是,它们没有更多的可理解的和没有必要在这里。它们提供比更大的灵活性地图然后减少,所以他们是值得了解的。

与code的问题是,你实际上想告诉星火做什么,而不是前pressing你的意图,让星火优化它是怎么做的你。

最后,我建议你买学习O'Reilly的学习星火。它很好地解释了内部的细节,就像上演,它显示了很多例子code就可以使用,太。

I am trying to model a genetics problem we are trying to solve, building up to it in steps. I can successfully run the PiAverage examples from Spark Examples. That example "throws darts" at a circle (10^6 in our case) and counts the number that "land in the circle" to estimate PI

Let's say I want to repeat that process 1000 times (in parallel) and average all those estimates. I am trying to see the best approach, seems like there's going to be two calls to parallelize? Nested calls? Is there not a way to chain map or reduce calls together? I can't see it.

I want to know the wisdom of something like the idea below. I thought of tracking the resulting estimates using an accumulator. jsc is my SparkContext, full code of single run is at end of question, thanks for any input!

Accumulator<Double> accum = jsc.accumulator(0.0);

// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);

// pass this "dummy list" to parallelize, which then 
// calls a pieceOfPI method to produce each individual estimate  
// accumulating the estimates. PieceOfPI would contain a 
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));

// get the value of the total of PI estimates and print their average
double totalPi = accum.value();

// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);

It doesn't seem like a matrix or other answers I see on SO give the answer to this specific question, I have done several searches but I am not seeing how to do this without "parallelizing the parallelization." Is that a bad idea?

(and yes I realize mathematically I could just do more estimates and effectively get the same results :) Trying to build a structure my boss wants, thanks again!

I have put my entire single-test program here if that helps, sans an accumulator I was testing out. The core of this would become PieceOfPI():

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;

public class PiAverage implements Serializable {

public static void main(String[] args) {

    PiAverage pa = new PiAverage();
    pa.go();

}

public void go() {

    // should make a parameter like all these finals should be
    // int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    final int SLICES = 16;

    // how many "darts" are thrown at the circle to get one single Pi estimate
    final int HOW_MANY_DARTS = 1000000;

    // how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
    final int HOW_MANY_ESTIMATES = 1000;

    SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
        .setMaster("local[4]");

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
    List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
    for (int i = 0; i < HOW_MANY_DARTS; i++) {
        throwsList.add(i);
    }

    // setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
    List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
    for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
        numberOfEstimates.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);

    long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
        public Boolean call(Integer i) {
            double x = Math.random();
            double y = Math.random();
            if (x * x + y * y < 1) {
                return true;
            } else
                return false;
        }
    }).count();

    System.out.println(
            "The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);

    jsc.stop();
    jsc.close();
}
}

解决方案

Let me start with your "background question". Transformation operations like map, join, groupBy, etc. fall into two categories; those that require a shuffle of data as input from all the partitions, and those that don't. Operations like groupBy and join require a shuffle, because you need to bring together all records from all the RDD's partitions with the same keys (think of how SQL JOIN and GROUP BY ops work). On the other hand, map, flatMap, filter, etc. don't require shuffling, because the operation works fine on the input of the previous step's partition. They work on single records at a time, not groups of them with matching keys. Hence, no shuffling is necessary.

This background is necessary to understand that an "extra map" does not have a significant overhead. A sequent of operations like map, flatMap, etc. are "squashed" together into a "stage" (which is shown when you look at details for a job in the Spark Web console) so that only one RDD is materialized, the one at the end of the stage.

On to your first question. I wouldn't use an accumulator for this. They are intended for "side-band" data, like counting how many bad lines you parsed. In this example, you might use accumulators to count how many (x,y) pairs were inside the radius of 1 vs. outside, as an example.

The JavaPiSpark example in the Spark distribution is about as good as it gets. You should study why it works. It's the right dataflow model for Big Data systems. You could use "aggregators". In the Javadocs, click the "index" and look at the agg, aggregate, and aggregateByKey functions. However, they are no more understandable and not necessary here. They provide greater flexibility than map then reduce, so they are worth knowing

The problem with your code is that you are effectively trying to tell Spark what to do, rather than expressing your intent and letting Spark optimize how it does it for you.

Finally, I suggest you buy and study O'Reilly's "Learning Spark". It does a good job explaining the internal details, like staging, and it shows lots of example code you can use, too.

这篇关于在Apache中星火,可我很容易重复/巢SparkContext.parallelize?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆