Python多处理pool.map无法并行工作 [英] Python multiprocessing pool.map doesn't work parallel

查看:598
本文介绍了Python多处理pool.map无法并行工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一个简单的并行python程序

I wrote a simple parallel python program

import multiprocessing as mp
import time

def test_function(i):
    print("function starts" + str(i))
    time.sleep(1)
    print("function ends" + str(i))

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count())
    pool.map(test_function, [i for i in range(4)])
    pool.close()
    pool.join()

我希望在输出中看到的内容:

What I expect to see in the output:

function starts0
function starts2
function starts1
function starts3
function ends1
function ends3
function ends2
function ends0

我实际上看到的是

function starts1
function ends1
function starts3
function ends3
function starts2
function ends2
function starts0
function ends0

当我查看输出时就像pool.map运行一个函数并等待完成,然后运行另一个函数,但是当我计算整个程序的持续时间约为2秒时,除非正在并行运行

When I'm looking in the output It's look like pool.map running a function and waits till it's done and then run another, but when I calculate the duration of whole program its about 2 seconds and it's impossible unless the test_function is running parallel

此代码在MacOS和Linux上运行良好,但未显示 windows 10 上的预期输出. python版本是3.6.4

This code is working well in MacOS and Linux but It's not showing the expected output on windows 10. python version is 3.6.4

推荐答案

multiprocessing.Pool() 文档(从此以后,

The multiprocessing.Pool() documentation ( since ever, Py27 incl. ) is clear in intentionally blocking in processing the queue-of-calls as created by the iterator-generated set of the just -4- calls, produced sequentially from the above posted example.

multiprocessing -模块文档中有关其 Pool.map() 方法的说明如下:

The multiprocessing-module documentation says this about its Pool.map() method:

map(func, iterable[, chunksize])

map()内置函数的并行等效项(尽管它仅支持一个可迭代的参数). 它会阻塞直到结果准备就绪.

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

这应该是观察到的行为,而不同的实例化方法将产生不同的附加(与流程复制相关的)开销.

This should be the observed behaviour, whereas different instantiation methods would accrue different add-on ( process copying-related ) overhead costs.

无论如何,由于操作系统的原因,mp.cpu_count()不需要 是要执行的所有此类分派的.Pool() -instance worker任务将继续执行的CPU核心数(与用户/流程相关的限制策略)亲和力设置:

Anyway, the mp.cpu_count() need not be the number of CPU-cores any such dispatched .Pool()-instance workers' tasks will get on to get executed, because of the O/S ( user/process-related restriction policies ) settings of affinity:

您的代码将必须服从"那些CPU内核的子集,这些CPU内核的子集允许通过任何此类multiprocessing所请求的子进程使用,其数量不大于: len( os.sched_getaffinity( 0 ) )

Your code will have to "obey" the sub-set of those CPU-cores, that are permitted to be harnessed by any such multiprocessing-requested sub-process,
the number of which is not higher than: len( os.sched_getaffinity( 0 ) )

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
           "invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function, [i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )


可能会在linux级操作系统上查看类似的内容:


might look about something like this on linux-class O/S:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:


检查隐藏的详细信息-您的操作系统使用什么来调用test_function()- mapstar() (并非普遍确定的选择) 是本地SMP-linux-class O/S选择其默认子进程实例化方法的方法,该方法通过' fork 执行.


Check the hidden detail - what your O/S uses for invoking the test_function() - the mapstar() ( not being a sure choice universally ) was the local SMP-linux-class O/S's choice for its default sub-process instantiation method, performed via 'fork'.

这篇关于Python多处理pool.map无法并行工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆