使用连接时,星火迭代时间成倍增加 [英] Spark iteration time increasing exponentially when using join

查看:149
本文介绍了使用连接时,星火迭代时间成倍增加的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是很新的火花,我想实现集群与马尔可夫模型psented重心重新$ P $一些迭代算法(期望最大化)。所以,我需要做的迭代和联接。

I'm quite new to Spark and I'm trying to implement some iterative algorithm for clustering (expectation-maximization) with centroid represented by Markov model. So I need to do iterations and joins.

这是我遇到的一个问题是,每一次迭代成倍增长。结果
一些试验后,我发现,这样做迭代时,它需要坚持的是要在未来的迭代中重复使用RDD,否则每次迭代的火花将创建从开始重新计算RDD,从而提高计算时间的执行计划。

One problem that I experience is that each iterations time growth exponentially.
After some experimenting I found that when doing iterations it's needed to persist RDD that is going to be reused in the next iteration, otherwise every iteration spark will create execution plan that will recalculate the RDD from from start, thus increasing calculation time.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
#     init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

结果:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

因此​​增加高速缓存()帮助和迭代时间的推移变得恒定。

So adding cache() helps and iteration time become constant.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368

但制作时加入的问题回来迭代里面。
下面是一些简单的code我展示的问题。甚至使得每个RDD改造缓存没有解决的问题:

But when making Join inside the iteration the problem comes back. Here is some simple code I demonstrating the problem. Even making cache on each RDD transformation doesn't solve the problem:

init = sc.parallelize(xrange(10000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))
    init2.cache()

    init3 = init.map(lambda n: (n, n*2))
    init3.cache()

    init4 = init2.join(init3)
    init4.count()
    init4.cache()

    init = init4.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

这里是输出。正如你所看到的迭代次数呈几何级数增长:(

And here is the output. As you can see iteration time growing exponentially :(

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815

我真的会AP preciate任何帮助:)

I will really appreciate any help :)

推荐答案

据我可以告诉这里有两个交错的问题 - 越来越多的分区和洗牌开销时加入。两者都可以很容易地处理,所以让我们一步一步来。

As far as I can tell there are two interleaved problems here - growing number of partitions and shuffling overhead during joins. Both can be easily handled so lets go step by step.

首先让我们创建一个帮助收集统计信息:

First lets create a helper to collect the statistics:

import datetime

def get_stats(i, init, init2, init3, init4,
       start, end, desc, cache, part, hashp):
    return {
        "i": i,
        "init": init.getNumPartitions(),
        "init1": init2.getNumPartitions(),
        "init2": init3.getNumPartitions(),
        "init4": init4.getNumPartitions(),
        "time": str(end - start),
        "timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
        "desc": desc,
        "cache": cache,
        "part": part,
        "hashp": hashp
    }

另外一个帮手来处理缓存/分区

another helper to handle caching/partitioning

def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
    rdd = rdd if not part else rdd.repartition(npart)
    rdd = rdd if not hashp else rdd.partitionBy(npart)
    return rdd if not cache else rdd.cache()

提取管道的逻辑:

extract pipeline logic:

def run(init, description, cache=True, part=False, hashp=False, 
    npart=16, n=6):
    times = []

    for i in range(n):
        start = datetime.datetime.now()

        init2 = procRDD(
                init.map(lambda n: (n, n*3)),
                cache, part, hashp, npart)
        init3 = procRDD(
                init.map(lambda n: (n, n*2)),
                cache, part, hashp, npart)


        # If part set to True limit number of the output partitions
        init4 = init2.join(init3, npart) if part else init2.join(init3) 
        init = init4.map(lambda n: n[0])

        if cache:
            init4.cache()
            init.cache()

        init.count() # Force computations to get time
        end = datetime.datetime.now() 

        times.append(get_stats(
            i, init, init2, init3, init4,
            start, end, description,
            cache, part, hashp
        ))

    return times

和创建初始数据:

ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()

本身加入操作时,如果不提供 numPartitions 参数,在基于输入RDDS的分区数的输出调整分区数。这意味着越来越多的与每个迭代分区。如果分区的数量是大事情变得丑陋。您可以处理这些通过为每次迭代加入或重新RDDS numPartitions 参数。

Join operation by itself, if numPartitions argument is not provided, adjust number of partitions in the output based on the number of partitions of the input RDDs. It means growing number of partitions with each iteration. If number of partitions is to large things get ugly. You can deal with these by providing numPartitions argument for join or repartition RDDs with each iteration.

timesCachePart = sqlContext.createDataFrame(
        run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4|          time|             desc|
+-+-----+-----+-----+--------------+-----------------+
|0|   16|   16|   16|0:00:01.145625|cache + partition|
|1|   16|   16|   16|0:00:01.090468|cache + partition|
|2|   16|   16|   16|0:00:01.059316|cache + partition|
|3|   16|   16|   16|0:00:01.029544|cache + partition|
|4|   16|   16|   16|0:00:01.033493|cache + partition|
|5|   16|   16|   16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+

正如你可以看到,当我们重新分区执行时间或多或少是恒定的。
第二个问题是,上述数据是随机分配。为了确保连接性能,我们希望有对单个分区相同的密钥。为了实现这一目标,我们可以使用散列分区:

As you can see when we repartition execution time is more or less constant. The second problem is that above data is partitioned randomly. To ensure join performance we would like to have same keys on a single partition. To achieve that we can use hash partitioner:

timesCacheHashPart = sqlContext.createDataFrame(
    run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+----------------+
|i|init1|init2|init4|          time|            desc|
+-+-----+-----+-----+--------------+----------------+
|0|   16|   16|   16|0:00:00.946379|cache + hashpart|
|1|   16|   16|   16|0:00:00.966519|cache + hashpart|
|2|   16|   16|   16|0:00:00.945501|cache + hashpart|
|3|   16|   16|   16|0:00:00.986777|cache + hashpart|
|4|   16|   16|   16|0:00:00.960989|cache + hashpart|
|5|   16|   16|   16|0:00:01.026648|cache + hashpart|
+-+-----+-----+-----+--------------+----------------+

执行时间是和以前一样不变并且有过基本的分区小的改进。

Execution time is constant as before and There is a small improvement over the basic partitioning.

现在可以使用缓存只能作为参考:

Now lets use cache only as a reference:

timesCacheOnly = sqlContext.createDataFrame(
    run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()


+-+-----+-----+-----+--------------+----------+
|i|init1|init2|init4|          time|      desc|
+-+-----+-----+-----+--------------+----------+
|0|   16|   16|   32|0:00:00.992865|cache-only|
|1|   32|   32|   64|0:00:01.766940|cache-only|
|2|   64|   64|  128|0:00:03.675924|cache-only|
|3|  128|  128|  256|0:00:06.477492|cache-only|
|4|  256|  256|  512|0:00:11.929242|cache-only|
|5|  512|  512| 1024|0:00:23.284508|cache-only|
+-+-----+-----+-----+--------------+----------+

正如可以看到的分区的数目(INIT2,INIT3,init4)为仅高速缓存版本随每次迭代和执行时间加倍成比例分区的数量。

As you can see number of partitions (init2, init3, init4) for cache-only version doubles with each iteration and execution time is proportional to the number of partitions.

最后,我们可以检查,如果我们使用哈希分区,我们可以改善与大量的分区性能:

Finally we can check if we can improve performance with large number of partitions if we use hash partitioner:

timesCacheHashPart512 = sqlContext.createDataFrame(
    run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
    "i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4|          time|                desc|
+-+-----+-----+-----+--------------+--------------------+
|0|  512|  512|  512|0:00:14.492690|cache + hashpart 512|
|1|  512|  512|  512|0:00:20.215408|cache + hashpart 512|
|2|  512|  512|  512|0:00:20.408070|cache + hashpart 512|
|3|  512|  512|  512|0:00:20.390267|cache + hashpart 512|
|4|  512|  512|  512|0:00:20.362354|cache + hashpart 512|
|5|  512|  512|  512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+

改进也不是那么IM pressive,但如果你有一个小集群,大量的数据还是值得尝试。

Improvement is not so impressive but if you have a small cluster and a lot of data it is still worth trying.

我想带走的消息是在这里分割事宜。还有在那里为你( mllib SQL ),但如果你使用低级别的操作是你的处理上下文责任。

I guess take away message here is partitioning matters. There are contexts where it is handled for you (mllib, sql) but if you use low level operations it is your responsibility.

这篇关于使用连接时,星火迭代时间成倍增加的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆