在Python多处理中将Pool.map与共享内存数组结合 [英] Combine Pool.map with shared memory Array in Python multiprocessing

查看:85
本文介绍了在Python多处理中将Pool.map与共享内存数组结合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常大的(只读)数据数组,我想由多个进程并行处理.

I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.

我喜欢Pool.map函数,并希望使用它来并行计算该数据上的函数.

I like the Pool.map function and would like to use it to calculate functions on that data in parallel.

我看到人们可以使用Value或Array类在进程之间使用共享内存数据.但是当我尝试使用它时,我得到一个RuntimeError:'使用Pool.map函数时,应该仅通过继承在进程之间共享SynchronizedString对象:

I saw that one can use the Value or Array class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance when using the Pool.map function:

这是我要执行的操作的简化示例:

Here is a simplified example of what I am trying to do:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

有人可以告诉我我在做什么错吗?

Can anyone tell me what I am doing wrong here?

所以我想做的是在进程池中创建新的共享内存分配的数组后,将信息传递给这些进程.

So what i would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.

推荐答案

再次尝试,因为我看到了赏金;)

Trying again as I just saw the bounty ;)

基本上,我认为错误消息的含义是它的含义-不能将多处理共享内存数组作为参数传递(通过酸洗).序列化数据没有意义-关键是数据是共享内存.因此,您必须使共享数组成为全局数组.我认为像我的第一个答案一样,将其作为模块的属性是比较整洁的,但是在示例中将其保留为全局变量也可以很好地工作.考虑到您不想在fork之前设置数据的观点,这是一个修改后的示例.如果您希望拥有多个共享数组(这就是为什么要将toShare作为参数传递的原因),则可以类似地创建共享数组的全局列表,然后将索引传递给count_it(这将成为for c in toShare[i]:) .

Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[由于未使用fork,以上内容在Windows上不起作用.但是,以下内容在Windows上仍然可以使用Pool,但仍然可以使用,因此我认为这是最接近您想要的内容:

[ The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

不知道为什么map不会腌制数组,而Process和Pool会腌制-我认为也许是在Windows上的子进程初始化时转移了它.请注意,虽然在派生之后仍然设置了数据.

Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.

这篇关于在Python多处理中将Pool.map与共享内存数组结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆