使用 join 时 Spark 迭代时间呈指数增长 [英] Spark iteration time increasing exponentially when using join

查看:27
本文介绍了使用 join 时 Spark 迭代时间呈指数增长的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Spark 很陌生,我正在尝试实现一些迭代算法,用于使用由马尔可夫模型表示的质心进行聚类(期望最大化).所以我需要进行迭代和连接.

I'm quite new to Spark and I'm trying to implement some iterative algorithm for clustering (expectation-maximization) with centroid represented by Markov model. So I need to do iterations and joins.

我遇到的一个问题是每次迭代的时间呈指数增长.
经过一些实验,我发现在进行迭代时需要持久化将在下一次迭代中重用的 RDD,否则每次迭代 spark 都会创建执行计划,从头开始重新计算 RDD,从而增加计算时间.

One problem that I experience is that each iterations time growth exponentially.
After some experimenting I found that when doing iterations it's needed to persist RDD that is going to be reused in the next iteration, otherwise every iteration spark will create execution plan that will recalculate the RDD from from start, thus increasing calculation time.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
#     init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

结果:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

因此添加 cache() 有助于并且迭代时间变得恒定.

So adding cache() helps and iteration time become constant.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368

但是当在迭代中加入 Join 时,问题又回来了.这是我演示问题的一些简单代码.即使对每个 RDD 转换进行缓存也不能解决问题:

But when making Join inside the iteration the problem comes back. Here is some simple code I demonstrating the problem. Even making cache on each RDD transformation doesn't solve the problem:

init = sc.parallelize(xrange(10000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))
    init2.cache()

    init3 = init.map(lambda n: (n, n*2))
    init3.cache()

    init4 = init2.join(init3)
    init4.count()
    init4.cache()

    init = init4.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

这是输出.如您所见,迭代时间呈指数增长:(

And here is the output. As you can see iteration time growing exponentially :(

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815

我将非常感谢任何帮助:)

I will really appreciate any help :)

推荐答案

总结:

一般来说,迭代算法,尤其是具有自连接或自联合的迭代算法,需要控制:

Generally speaking iterative algorithms, especially ones with self-join or self-union, require a control over:

这里描述的问题是缺少前一个的结果.在每次迭代中,分区数量随着自连接而增加,导致指数模式.要解决这个问题,您必须控制每次迭代中的分区数量(见下文)或使用全局工具,如 spark.default.parallelism(见 特拉维斯提供的答案).一般来说,第一种方法提供了更多的控制,并且不会影响代码的其他部分.

Problem described here is a result of the lack of the former one. In each iteration number of partition increases with self-join leading to exponential pattern. To address that you have to either control number of partitions in each iteration (see below) or use global tools like spark.default.parallelism (see an answer provided by Travis). In general the first approach provides much more control in general and doesn't affect other parts of code.

原答案:

据我所知,这里有两个交错问题 - 增加的分区数量和连接期间的改组开销.两者都可以轻松处理,所以让我们一步一步来.

As far as I can tell there are two interleaved problems here - growing number of partitions and shuffling overhead during joins. Both can be easily handled so lets go step by step.

首先让我们创建一个帮助器来收集统计信息:

First lets create a helper to collect the statistics:

import datetime

def get_stats(i, init, init2, init3, init4,
       start, end, desc, cache, part, hashp):
    return {
        "i": i,
        "init": init.getNumPartitions(),
        "init1": init2.getNumPartitions(),
        "init2": init3.getNumPartitions(),
        "init4": init4.getNumPartitions(),
        "time": str(end - start),
        "timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
        "desc": desc,
        "cache": cache,
        "part": part,
        "hashp": hashp
    }

另一个处理缓存/分区的助手

another helper to handle caching/partitioning

def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
    rdd = rdd if not part else rdd.repartition(npart)
    rdd = rdd if not hashp else rdd.partitionBy(npart)
    return rdd if not cache else rdd.cache()

提取流水线逻辑:

def run(init, description, cache=True, part=False, hashp=False, 
    npart=16, n=6):
    times = []

    for i in range(n):
        start = datetime.datetime.now()

        init2 = procRDD(
                init.map(lambda n: (n, n*3)),
                cache, part, hashp, npart)
        init3 = procRDD(
                init.map(lambda n: (n, n*2)),
                cache, part, hashp, npart)


        # If part set to True limit number of the output partitions
        init4 = init2.join(init3, npart) if part else init2.join(init3) 
        init = init4.map(lambda n: n[0])

        if cache:
            init4.cache()
            init.cache()

        init.count() # Force computations to get time
        end = datetime.datetime.now() 

        times.append(get_stats(
            i, init, init2, init3, init4,
            start, end, description,
            cache, part, hashp
        ))

    return times

并创建初始数据:

ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()

Join本身,如果没有提供numPartitions参数,则根据输入RDD的分区数调整输出中的分区数.这意味着每次迭代都会增加分区数量.如果分区数量很大,事情就会变得丑陋.您可以通过为每次迭代加入或重新分区 RDD 提供 numPartitions 参数来处理这些问题.

Join operation by itself, if numPartitions argument is not provided, adjust number of partitions in the output based on the number of partitions of the input RDDs. It means growing number of partitions with each iteration. If number of partitions is to large things get ugly. You can deal with these by providing numPartitions argument for join or repartition RDDs with each iteration.

timesCachePart = sqlContext.createDataFrame(
        run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4|          time|             desc|
+-+-----+-----+-----+--------------+-----------------+
|0|   16|   16|   16|0:00:01.145625|cache + partition|
|1|   16|   16|   16|0:00:01.090468|cache + partition|
|2|   16|   16|   16|0:00:01.059316|cache + partition|
|3|   16|   16|   16|0:00:01.029544|cache + partition|
|4|   16|   16|   16|0:00:01.033493|cache + partition|
|5|   16|   16|   16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+

正如你所看到的,当我们重新分区时,执行时间或多或少是恒定的.第二个问题是上面的数据是随机分区的.为了确保连接性能,我们希望在单个分区上具有相同的键.为了实现这一点,我们可以使用哈希分区器:

As you can see when we repartition execution time is more or less constant. The second problem is that above data is partitioned randomly. To ensure join performance we would like to have same keys on a single partition. To achieve that we can use hash partitioner:

timesCacheHashPart = sqlContext.createDataFrame(
    run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+----------------+
|i|init1|init2|init4|          time|            desc|
+-+-----+-----+-----+--------------+----------------+
|0|   16|   16|   16|0:00:00.946379|cache + hashpart|
|1|   16|   16|   16|0:00:00.966519|cache + hashpart|
|2|   16|   16|   16|0:00:00.945501|cache + hashpart|
|3|   16|   16|   16|0:00:00.986777|cache + hashpart|
|4|   16|   16|   16|0:00:00.960989|cache + hashpart|
|5|   16|   16|   16|0:00:01.026648|cache + hashpart|
+-+-----+-----+-----+--------------+----------------+

执行时间和以前一样不变,并且比基本分区有小的改进.

Execution time is constant as before and There is a small improvement over the basic partitioning.

现在让我们只使用缓存作为参考:

Now lets use cache only as a reference:

timesCacheOnly = sqlContext.createDataFrame(
    run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()


+-+-----+-----+-----+--------------+----------+
|i|init1|init2|init4|          time|      desc|
+-+-----+-----+-----+--------------+----------+
|0|   16|   16|   32|0:00:00.992865|cache-only|
|1|   32|   32|   64|0:00:01.766940|cache-only|
|2|   64|   64|  128|0:00:03.675924|cache-only|
|3|  128|  128|  256|0:00:06.477492|cache-only|
|4|  256|  256|  512|0:00:11.929242|cache-only|
|5|  512|  512| 1024|0:00:23.284508|cache-only|
+-+-----+-----+-----+--------------+----------+

如您所见,仅缓存版本的分区数(init2、init3、init4)在每次迭代中翻倍,并且执行时间与分区数成正比.

As you can see number of partitions (init2, init3, init4) for cache-only version doubles with each iteration and execution time is proportional to the number of partitions.

最后,我们可以检查如果我们使用哈希分区器是否可以提高大量分区的性能:

Finally we can check if we can improve performance with large number of partitions if we use hash partitioner:

timesCacheHashPart512 = sqlContext.createDataFrame(
    run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
    "i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4|          time|                desc|
+-+-----+-----+-----+--------------+--------------------+
|0|  512|  512|  512|0:00:14.492690|cache + hashpart 512|
|1|  512|  512|  512|0:00:20.215408|cache + hashpart 512|
|2|  512|  512|  512|0:00:20.408070|cache + hashpart 512|
|3|  512|  512|  512|0:00:20.390267|cache + hashpart 512|
|4|  512|  512|  512|0:00:20.362354|cache + hashpart 512|
|5|  512|  512|  512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+

改进并不那么令人印象深刻,但如果您有一个小集群和大量数据,它仍然值得一试.

Improvement is not so impressive but if you have a small cluster and a lot of data it is still worth trying.

我想这里带走的信息是分区问题.有为您处理它的上下文(mllibsql),但如果您使用低级操作,则由您负责.

I guess take away message here is partitioning matters. There are contexts where it is handled for you (mllib, sql) but if you use low level operations it is your responsibility.

这篇关于使用 join 时 Spark 迭代时间呈指数增长的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆