星火示例程序运行速度很慢 [英] Spark example program runs very slow

查看:251
本文介绍了星火示例程序运行速度很慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用火花,简单的图形问题工作。我发现星火源文件夹中的示例程序:transitive_closure.py,其计算一个图的传递闭包不超过200边和顶点。但在我自己的笔记本电脑,它运行超过10分钟,并不会终止。我使用的命令行是:火花提交transitive_closure.py。

我不知道为什么火花是如此缓慢的计算只是这么小的传递闭包的结果,即使?它是一个常见的​​情况?是否有任何配置我错过?

程序如下所示,并且可以在火星上找到他们的网站安装文件夹中。

从__future__进口print_function进口SYS
从随机随机进口从pyspark进口SparkContextnumEdges = 200
numVertices = 100
兰特=随机(42)
高清generateGraph():
    边=集()
    而LEN(边)LT; numEdges:
        SRC = rand.randrange(0,numEdges)
        DST = rand.randrange(0,numEdges)
        如果SRC = DST!
            edges.add((SRC,DST))
    返回边缘
如果__name__ ==__main__:
    
    用法:transitive_closure [分区]
    
    SC = SparkContext(的appName =PythonTransitiveClosure)
    分区= INT(sys.argv中[1])如果len(sys.argv中)GT; 1 2其他
    TC = sc.parallelize(generateGraph(),分区).cache()    #线性传递闭包:每一轮由一个边缘的增长路径,
    #由与已经发现的路径接合图形的边。
    例如#加入从TC的路径(Y,Z)与从边缘(X,Y)
    #图表,以获得路径(X,Z)。    #由于join()方法连接上的按键,边缘存储在相反的顺序。
    边= tc.map(拉姆达X_Y:(X_Y [1],X_Y [0]))    oldCount = 0
    nextCount = tc.count()
    而真正的:
        oldCount = nextCount
        #执行连接,获得的RDD(Y,(Z,X))对,
        #然后投射的结果,以获得新的(X,Z)的路径。
        new_edges = tc.join(边缘).MAP(拉姆达__a_b:(__a_b [1] [1],__a_b [1] [0]))
        TC = tc.union(new_edges).distinct()。缓存()
        nextCount = tc.count()
        如果nextCount == oldCount:
            打破    打印(TC具有%I的边缘%tc.count())    sc.stop()


解决方案

有许多可为何这code不您的计算机上执行特别好,但最有可能的,这是中所述问题的另一种变型< A HREF =htt​​p://stackoverflow.com/q/31659404/1560062>星火迭代时间使用连接时成倍增加。检查最简单的方法,如果它是真实的情况是提供 spark.default.parallelism 参数提交:

 斌/火花提交--conf spark.default.parallelism = 2 \\
  例子/ src目录/主/蟒蛇/ transitive_closure.py

如果不限制,否则 SparkContext.union RDD.join RDD。联合设置许多儿童的分区中的父母分区的总数。通常它是一个期望的行为,但如果应用迭代可能变得非常低效的。

I tried to use Spark to work on simple graph problem. I found an example program in Spark source folder: transitive_closure.py, which computes the transitive closure in a graph with no more than 200 edges and vertices. But in my own laptop, it runs more than 10 minutes and doesn't terminate. The command line I use is: spark-submit transitive_closure.py.

I wonder why spark is so slow even when computing just such small transitive closure result? Is it a common case? Is there any configuration I miss?

The program is shown below, and can be found in spark install folder at their website.

from __future__ import print_function

import sys
from random import Random

from pyspark import SparkContext

numEdges = 200
numVertices = 100
rand = Random(42)


def generateGraph():
    edges = set()
    while len(edges) < numEdges:
        src = rand.randrange(0, numEdges)
        dst = rand.randrange(0, numEdges)
        if src != dst:
            edges.add((src, dst))
    return edges


if __name__ == "__main__":
    """
    Usage: transitive_closure [partitions]
    """
    sc = SparkContext(appName="PythonTransitiveClosure")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    tc = sc.parallelize(generateGraph(), partitions).cache()

    # Linear transitive closure: each round grows paths by one edge,
    # by joining the graph's edges with the already-discovered paths.
    # e.g. join the path (y, z) from the TC with the edge (x, y) from
    # the graph to obtain the path (x, z).

    # Because join() joins on keys, the edges are stored in reversed order.
    edges = tc.map(lambda x_y: (x_y[1], x_y[0]))

    oldCount = 0
    nextCount = tc.count()
    while True:
        oldCount = nextCount
        # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
        # then project the result to obtain the new (x, z) paths.
        new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
        tc = tc.union(new_edges).distinct().cache()
        nextCount = tc.count()
        if nextCount == oldCount:
            break

    print("TC has %i edges" % tc.count())

    sc.stop()

解决方案

There can many reasons why this code doesn't perform particularly well on your machine but most likely this is just another variant of the problem described in Spark iteration time increasing exponentially when using join. The simplest way to check if it is indeed the case is to provide spark.default.parallelism parameter on submit:

bin/spark-submit --conf spark.default.parallelism=2 \
  examples/src/main/python/transitive_closure.py

If not limited otherwise, SparkContext.union, RDD.join and RDD.union set a number of partitions of the child to the total number of partitions in the parents. Usually it is a desired behavior but can become extremely inefficient if applied iteratively.

这篇关于星火示例程序运行速度很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆