Spark - 迭代算法的奇怪行为 [英] Spark - Strange behaviour with iterative algorithms

查看:15
本文介绍了Spark - 迭代算法的奇怪行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 编写迭代算法.该算法包含一个主循环,其中使用了不同的 Spark 命令以实现并行性.如果每次迭代只使用一个 Spark 命令,那么一切正常.当使用多个命令时,Spark 的行为会变得非常奇怪.主要的问题是,一个带有 2 个项目的 RDD 上的 map 命令不会导致 2,而是导致许多函数调用.

I am trying to write an iterative algorithm with Spark. The algorithm contains one main loop in which different Spark commands for parallelism are used. If only one Spark command is used in each iteration everything works fine. When more than one command is used, the behaviour of Spark gets very strange. The main problem is that a map command on a RDD with 2 items does not result in 2, but in many many function calls.

似乎 Spark 在迭代 x 中执行从迭代 1 到迭代 x-1 的每个命令.但不仅在循环的最后一次迭代中,而且在循环的每一次迭代中!

It seems like Spark is executing in an iteration x every command from iteration 1 to iteration x-1 again. But not only in the last iteration of the loop, but in every single iteration of the loop!

我构建了一个小示例来重现该行为(使用 Java 1.8 和 Spark 1.6.1)

I built a small example to reproduce the behaviour (With Java 1.8 and Spark 1.6.1)

首先是RDD中使用的数据结构:

At first the data structure that is used in the RDD:

public class Data implements Serializable {
    private static final long serialVersionUID = -6367920689454127925L;
    private String id;
    private Integer value;

    public Data(final String id, final Integer value) {
        super();
        this.id = id;
        this.value = value;
    }

    public String getId() {
        return this.id;
    }

    public Integer getValue() {
        return this.value;
    }

    public void setValue(final Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Data [id=" + this.id + ", value=" + this.value + "]";
    }
}

对于 max 命令,我们使用比较器:

For an max command we use a comparator:

public class MyComparator implements java.util.Comparator<Data>, Serializable {

    private static final long serialVersionUID = 1383816444011380318L;

    private static final double EPSILON = 0.001;

    public MyComparator() {
    }

    @Override
    public int compare(final Data x, final Data y) {
        if (Math.abs(x.getValue() - y.getValue()) < EPSILON) {
            return 0;
        } else if (x.getValue() < y.getValue()) {
            return -1;
        } else {
            return 1;
        }
    }

}

现在是带有算法的主程序:

And now the main program with the algorithm:

public class Job implements Serializable {

    private static final long serialVersionUID = -1828983500553835114L;

    // Spark Settings
    private static final String APPNAME = "DebugApp - Main";
    private static final String SPARKMASTER = "local[1]";
    private static final int MAX_ITERATIONS = 4;

    public Job() {
    }

    public static void main(final String[] args) {
        final Job job = new Job();
        job.run();
    }

    public void run() {
        final JavaSparkContext sparkContext = createSparkContext();
        final List<Data> dataSet = new ArrayList<Data>();
        dataSet.add(new Data("0", 0));
        dataSet.add(new Data("1", 0));

        JavaRDD<Data> dataParallel = sparkContext.parallelize(dataSet);

        // We use an accumulator to count the number of calls within the map command
        final Accumulator<Integer> accum = sparkContext.accumulator(0);

        final MyComparator comparator = new MyComparator();
        for (int iterations = 0; iterations < MAX_ITERATIONS; iterations++) {
            // If the item which should be updated is selected using the iteration counter everything works fine...
            // final String idToUpdate = new Integer(iterations % 2).toString();

            // ..., but if the element with the minimal value is selected the number of executions in the map command increases.
            final String idToUpdate = dataParallel.min(comparator).getId();
            dataParallel = dataParallel.map(data -> {
                accum.add(1); // Counting the number of function calls.
                return updateData(data, idToUpdate);
            });
        }

        final List<Data> resultData = dataParallel.collect();
        System.out.println("Accumulator: " + accum.value());
        for (Data data : resultData) {
            System.out.println(data.toString());
        }
    }

    private Data updateData(final Data data, final String id) {
        if (data.getId().equals(id)) {
            data.setValue(data.getValue() + 1);
        }
        return data;
    }

    private JavaSparkContext createSparkContext() {
        final SparkConf conf = new SparkConf().setAppName(APPNAME).setMaster(SPARKMASTER);
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "de.eprofessional.bidmanager2.engine.serialization.KryoRegistratorWrapper");
        return new JavaSparkContext(conf);

    }
}

我希望对于每次迭代,我们都会获得 2 个函数调用,如果使用迭代计数器选择要更新的项目,就会出现这种情况(请参阅累加器结果 1).但是如果使用 min 命令选择元素,我们会得到不同的结果(参见 Accumulator Result 2):

I would expect that for each iteration we obtain 2 function calls, which is the case if the item to update is selected by using the iteration counter (see Accumulator Result 1). But if the element is selected by using the min command, we obtain different results (See Accumulator Result 2):

+----------------+----------------------+----------------------+
| MAX_ITERATIONS | Accumulator Result 1 | Accumulator Result 2 |
+----------------+----------------------+----------------------+
|              1 |                    2 |                    2 |
|              2 |                    4 |                    6 |
|              3 |                    6 |                   12 |
|              4 |                    8 |                   20 |
+----------------+----------------------+----------------------+

有人对 map 命令中的额外调用有解释吗?

Does someone have an explanation for the additional calls in the map command?

推荐答案

RDD 上的操作定义了所谓的沿袭".每个 RDD 都有对其父级(或父级,例如加入)的引用.当 RDD 被物化时,会访问这个血统.这构成了 RDD 中弹性的基础:Spark 可以通过在给定的数据分区上执行所述沿袭来重新创建数据集上的所有操作以得出结果.

Operations on RDDs defines what is called a "lineage". Each RDD has a reference to its parent (or parents, in case of e.g. a join). This lineage is visited when the RDD is materialized. That forms the basis of resiliency in RDDs: Spark can re-create all operations on a dataset to come to a result by executing said lineage on a given partition of data.

这里发生的是我们正在链接 .map 调用.如果我们展开循环,我们会看到如下内容:

What's happening here is that we are chaining .map calls. If we unfold the loop, we would see something like:

iter1 -> rdd.map(f)
iter2 -> rdd.map(f).map(f) 
iter3 -> rdd.map(f).map(f).map(f)
...

我们可以通过在循环中发出 rdd.toDebugString 来看到这一点.

We could see this by issuing a rdd.toDebugString within the loop.

所以,底线是:每一次通过实际上都会在前一阶段添加一个沿袭步骤.如果我们想打破这个血统,我们应该在每次迭代时checkpoint RDD 以记住"最后的中间结果.cache 具有类似的效果,但不能保证评估停止(以防没有更多的内存可缓存).因此,RDD物化可以进一步评估血统.

So, bottom line: each pass will actually add a lineage step to the previous stage. If we would like to break that lineage, we should checkpoint the RDD at each iteration to 'remember' the last intermediate result. cache has a similar effect, except that it's not guaranteed that the evaluation stops (in case there's no more memory to cache). Hence, RDD materialization may further evaluate the lineage.

这篇关于Spark - 迭代算法的奇怪行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆