为什么通过共享内存进行的通信比通过队列慢得多? [英] Why is communication via shared memory so much slower than via queues?

查看:63
本文介绍了为什么通过共享内存进行的通信比通过队列慢得多?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在最近的老式Apple MacBook Pro上使用Python 2.7.5,该产品具有四个硬件和八个逻辑CPU.即sysctl实用程序提供:

I am using Python 2.7.5 on a recent vintage Apple MacBook Pro which has four hardware and eight logical CPUs; i.e., the sysctl utility gives:

$ sysctl hw.physicalcpu
hw.physicalcpu: 4
$ sysctl hw.logicalcpu
hw.logicalcpu: 8

我需要在一个大的一维列表或数组上执行一些相当复杂的处理,然后将结果保存为中间输出,该输出将在以后在我的应用程序中的后续计算中再次使用.我的问题的结构很自然地适合于并行化,因此我认为我将尝试使用Python的多处理模块将一维数组细分为几部分(4部分或8部分,我不确定是哪一部分)来执行并行计算,然后将结果输出重新组装成最终格式.我正在尝试决定使用multiprocessing.Queue()(消息队列)还是multiprocessing.Array()(共享内存)作为首选的机制,以便将子进程返回的计算结果传递回主父进程,并且我一直在尝试几个玩具"模型,以确保我了解多处理模块的实际工作方式.但是,我遇到了一个意想不到的结果:在为同一问题创建两个本质上等效的解决方案时,使用共享内存进行进程间通信的版本似乎比使用message的版本需要更多的执行时间(例如30倍!).队列.下面,我为玩具"问题提供了两个不同版本的示例源代码,该问题使用并行过程生成了很长的随机数序列,并以两种不同方式将聚集的结果传递回父过程:首先使用消息队列,第二次使用共享内存.

I need to perform some rather complicated processing on a large 1-D list or array, and then save the result as an intermediate output which will be used again at a later point in a subsequent calculation within my application. The structure of my problem lends itself rather naturally to parallelization, so I thought that I would try to use Python's multiprocessing module to subdivide the 1D array into several pieces (either 4 pieces or 8 pieces, I'm not yet sure which), perform the calculations in parallel, and then reassemble the resulting output into its final format afterwards. I am trying to decide whether to use multiprocessing.Queue() (message queues) or multiprocessing.Array() (shared memory) as my preferred mechanism for communicating the resulting calculations from the child processes back up to the main parent process, and I have been experimenting with a couple of "toy" models in order to make sure that I understand how the multiprocessing module actually works. I've come across a rather unexpected result, however: in creating two essentially equivalent solutions to the same problem, the version which uses shared memory for interprocess communication seems to require much more execution time (like 30X more!) than the version using message queues. Below, I've included two different versions of sample source code for a "toy" problem which generates a long sequence of random numbers using parallel processes, and communicates the agglomerated result back to a parent process in two different ways: first using message queues, and the second time using shared memory.

以下是使用消息队列的版本:

Here is the version that uses message queues:

import random
import multiprocessing
import datetime

def genRandom(count, id, q):

    print("Now starting process {0}".format(id))
    output = []
    # Generate a list of random numbers, of length "count"
    for i in xrange(count):
        output.append(random.random())
    # Write the output to a queue, to be read by the calling process 
    q.put(output)

if __name__ == "__main__":
    # Number of random numbers to be generated by each process
    size = 1000000
    # Number of processes to create -- the total size of all of the random
    # numbers generated will ultimately be (procs * size)
    procs = 4

    # Create a list of jobs and queues 
    jobs = []
    outqs = []
    for i in xrange(0, procs):
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=genRandom, args=(size, i, q))
        jobs.append(p)
        outqs.append(q)

    # Start time of the parallel processing and communications section
    tstart = datetime.datetime.now()    
    # Start the processes (i.e. calculate the random number lists)      
    for j in jobs:
        j.start()

    # Read out the data from the queues
    data = []
    for q in outqs:
        data.extend(q.get())

    # Ensure all of the processes have finished
    for j in jobs:
        j.join()
    # End time of the parallel processing and communications section
    tstop = datetime.datetime.now()
    tdelta = datetime.timedelta.total_seconds(tstop - tstart)

    msg = "{0} random numbers generated in {1} seconds"
    print(msg.format(len(data), tdelta))

运行它时,我得到的结果通常看起来像这样:

When I run it, I get a result that typically looks about like this:

$ python multiproc_queue.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 0.514805 seconds

现在,这是等效的代码段,但仅进行了少许重构,以便它使用共享内存而不是队列:

Now, here is the equivalent code segment, but refactored just slightly so that it uses shared memory instead of queues:

import random
import multiprocessing
import datetime

def genRandom(count, id, d):

    print("Now starting process {0}".format(id))
    # Generate a list of random numbers, of length "count", and write them
    # directly to a segment of an array in shared memory
    for i in xrange(count*id, count*(id+1)):
        d[i] = random.random()

if __name__ == "__main__":
    # Number of random numbers to be generated by each process
    size = 1000000
    # Number of processes to create -- the total size of all of the random
    # numbers generated will ultimately be (procs * size)
    procs = 4

    # Create a list of jobs and a block of shared memory
    jobs = []
    data = multiprocessing.Array('d', size*procs)
    for i in xrange(0, procs):
        p = multiprocessing.Process(target=genRandom, args=(size, i, data))
        jobs.append(p)

    # Start time of the parallel processing and communications section
    tstart = datetime.datetime.now()    
    # Start the processes (i.e. calculate the random number lists)      
    for j in jobs:
        j.start()

    # Ensure all of the processes have finished
    for j in jobs:
    j.join()
    # End time of the parallel processing and communications section
    tstop = datetime.datetime.now()
    tdelta = datetime.timedelta.total_seconds(tstop - tstart)

    msg = "{0} random numbers generated in {1} seconds"
    print(msg.format(len(data), tdelta))

但是,当我运行共享内存版本时,典型结果看起来像这样:

When I run the shared memory version, however, the typical result looks more like this:

$ python multiproc_shmem.py 
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 15.839607 seconds

我的问题:为什么我的两个版本的代码之间的执行速度有如此大的差异(大约0.5秒vs. 15秒,是30倍!)?特别是,如何修改共享内存版本以使其运行更快?

My question: why is there such a huge difference in execution speeds (roughly 0.5 seconds vs. 15 seconds, a factor of 30X!) between the two versions of my code? And in particular, how can I modify the shared memory version in order to get it to run faster?

推荐答案

这是因为multiprocessing.Array默认情况下使用锁,以防止多个进程一次访问它:

This is because multiprocessing.Array uses a lock by default to prevent multiple processes from accessing it at once:

multiprocessing.Array(typecode_or_type,size_or_initializer,*, lock = True)

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

...

如果lock为True(默认设置),则将创建一个新的锁对象以同步对该值的访问.如果lock是Lock或RLock对象 然后将用于同步访问该值.如果锁是 False,那么将不会自动访问返回的对象 受锁保护,因此它不一定是过程安全的".

If lock is True (the default) then a new lock object is created to synchronize access to the value. If lock is a Lock or RLock object then that will be used synchronize access to the value. If lock is False then access to the returned object will not be automatically protected by a lock, so it will not necessarily be "process-safe".

这意味着您并不是真正并发地写入数组-一次只能有一个进程可以访问它.由于您的示例工作程序除了执行数组写操作外几乎什么也不做,因此不断等待此锁会严重影响性能.如果在创建数组时使用lock=False,则性能会更好:

This means you're not really concurrently writing to the array - only one process can access it at a time. Since your example workers are doing almost nothing but array writes, constantly waiting on this lock badly hurts performance. If you use lock=False when you create the array, the performance is much better:

使用lock=True:

Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 4.811205 seconds

使用lock=False:

Now starting process 0
Now starting process 3
Now starting process 1
Now starting process 2
4000000 random numbers generated in 0.192473 seconds

请注意,使用lock=False意味着您在进行不安全的操作时需要手动保护对Array的访问.您的示例是让进程写入唯一的部分,所以没关系.但是,如果您在执行此操作时试图从中读取内容,或者让不同的进程写入重叠的部分,则需要手动获取锁.

Note that using lock=False means you need to manually protect access to the Array whenever you're doing something that isn't process-safe. Your example is having processes write to unique parts, so it's ok. But if you were trying to read from it while doing that, or had different processes write to overlapping parts, you would need to manually acquire a lock.

这篇关于为什么通过共享内存进行的通信比通过队列慢得多?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆