为什么这个简单的Spark程序不能使用多个内核? [英] Why is this simple Spark program not utlizing multiple cores?

查看:73
本文介绍了为什么这个简单的Spark程序不能使用多个内核?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我正在16核多核系统上运行此简单程序.我跑 通过发布以下内容.

So, I'm running this simple program on a 16 core multicore system. I run it by issuing the following.

spark-submit --master local[*] pi.py

该程序的代码如下.

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

当我使用top来查看CPU时 消耗中,仅使用了1个内核.为什么会这样呢? Seconldy,火花 文档说默认并行性包含在属性中 spark.default.parallelism .我如何从我的内部读取此属性 python程序?

When I use top to see CPU consumption, only 1 core is being utilized. Why is it so? Seconldy, spark documentation says that the default parallelism is contained in property spark.default.parallelism. How can I read this property from within my python program?

推荐答案

可能是因为对sc.parallelize的调用将所有数据放在一个分区中.您可以将分区数指定为第二个参数来并行化:

Probably because the call to sc.parallelize puts all the data into one single partition. You can specify the number of partitions as 2nd argument to parallelize:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

请注意,这仍然会在驱动程序中使用一个CPU生成1200万个点,然后仅将它们散布到16个分区中以执行简化步骤.

Note that this would still generate the 12 millions points with one CPU in the driver and then only spread them out to 16 partitions to perform the reduce step.

更好的方法是尝试在分区后完成大部分工作:例如,以下代码仅在驱动程序上生成一个很小的数组,然后让每个远程任务生成实际的随机数和随后的PI近似值:

A better approach would try to do most of the work after the partitioning: for example the following generates only a tiny array on the driver and then lets each remote task generate the actual random numbers and subsequent PI approximation:

part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

最后,(因为越懒,我们越好),spark mllib实际上已经带有随机并行生成的随机数据生成,请在此处查看:

Finally, (because the more lazy we are the better), spark mllib actually comes already with a random data generation which is nicely parallelized, have a look here: http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation. So maybe the following is close to what you try to do (not tested => probably not working, but should hopefully be close)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )

这篇关于为什么这个简单的Spark程序不能使用多个内核?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆