Python:c_char_p的多处理和数组 [英] Python : multiprocessing and Array of c_char_p
问题描述
我正在启动3个进程,我希望它们将一个字符串放入共享数组中,该字符串位于与该进程(i)相对应的索引处.
I'm launching 3 processes and I want them to put a string into a shared array, at the index corresponding to the process (i).
看下面的代码,生成的输出是:
Look at the code below, the output generated is:
['test 0', None, None]
['test 1', 'test 1', None]
['test 2', 'test 2', 'test 2']
为什么'test 0'被test 1
覆盖,而test 1
被test 2
覆盖?
Why 'test 0' get overwritten by test 1
, and test 1
by test 2
?
我想要的是(顺序并不重要):
What I want is (order is not important) :
['test 0', None, None]
['test 0', 'test 1', None]
['test 0', 'test 1', 'test 2']
代码:
#!/usr/bin/env python
import multiprocessing
from multiprocessing import Value, Lock, Process, Array
import ctypes
from ctypes import c_int, c_char_p
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, arr, lock):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.arr = arr
self.lock = lock
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
answer = next_task(arr=self.arr, lock=self.lock)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, i):
self.i = i
def __call__(self, arr=None, lock=None):
with lock:
arr[self.i] = "test %d" % self.i
print arr[:]
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
arr = Array(ctypes.c_char_p, 3)
lock = multiprocessing.Lock()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]
for w in consumers:
w.start()
for i in xrange(3):
tasks.put(Task(i))
for i in xrange(num_consumers):
tasks.put(None)
我正在运行Python 2.7.3(Ubuntu)
I'm running Python 2.7.3 (Ubuntu)
推荐答案
此问题似乎类似于此问题.塞巴斯蒂安·J·F·塞巴斯蒂安(J.F. Sebastian)推测,对arr[i]
的赋值将arr[i]
指向仅对进行赋值的子进程有意义的内存地址.其他子进程在查看该地址时会检索垃圾.
This problem seems similar to this one. There, J.F. Sebastian speculated that the assignment to arr[i]
points arr[i]
to a memory address that was only meaningful to the subprocess making the assignment. The other subprocesses retrieve garbage when looking at that address.
至少有两种方法可以避免此问题.一种是使用multiprocessing.manager
列表:
There are at least two ways to avoid this problem. One is to use a multiprocessing.manager
list:
import multiprocessing as mp
class Consumer(mp.Process):
def __init__(self, task_queue, result_queue, lock, lst):
mp.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.lock = lock
self.lst = lst
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
answer = next_task(lock = self.lock, lst = self.lst)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, i):
self.i = i
def __call__(self, lock, lst):
with lock:
lst[self.i] = "test {}".format(self.i)
print([lst[i] for i in range(3)])
if __name__ == '__main__':
tasks = mp.JoinableQueue()
results = mp.Queue()
manager = mp.Manager()
lst = manager.list(['']*3)
lock = mp.Lock()
num_consumers = mp.cpu_count() * 2
consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)]
for w in consumers:
w.start()
for i in xrange(3):
tasks.put(Task(i))
for i in xrange(num_consumers):
tasks.put(None)
tasks.join()
另一种方法是使用具有固定大小的共享数组,例如mp.Array('c', 10)
.
import multiprocessing as mp
class Consumer(mp.Process):
def __init__(self, task_queue, result_queue, arr, lock):
mp.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.arr = arr
self.lock = lock
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
answer = next_task(arr = self.arr, lock = self.lock)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, i):
self.i = i
def __call__(self, arr, lock):
with lock:
arr[self.i].value = "test {}".format(self.i)
print([a.value for a in arr])
if __name__ == '__main__':
tasks = mp.JoinableQueue()
results = mp.Queue()
arr = [mp.Array('c', 10) for i in range(3)]
lock = mp.Lock()
num_consumers = mp.cpu_count() * 2
consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]
for w in consumers:
w.start()
for i in xrange(3):
tasks.put(Task(i))
for i in xrange(num_consumers):
tasks.put(None)
tasks.join()
我推测为什么mp.Array(ctypes.c_char_p, 3)
不起作用的原因是mp.Array('c', 10)
具有固定的大小,所以内存地址从不改变,而mp.Array(ctypes.c_char_p, 3)
具有可变的大小,因此当mp.Array(ctypes.c_char_p, 3)
具有可变大小时, arr[i]
被分配给更大的字符串.
I speculate that the reason why this works when mp.Array(ctypes.c_char_p, 3)
does not, is because mp.Array('c', 10)
has a fixed size so the memory address never changes, while mp.Array(ctypes.c_char_p, 3)
has a variable size, so the memory address might change when arr[i]
is assigned to a bigger string.
也许这就是文档是什么警告它何时声明,
Perhaps this is what the docs are warning about when it states,
尽管可以将指针存储在共享内存中,但请记住 这将指向特定地址空间中的位置 过程.但是,该指针很可能在 第二个进程的上下文,并尝试从中取消引用指针 第二个过程可能会导致崩溃.
Although it is possible to store a pointer in shared memory remember that this will refer to a location in the address space of a specific process. However, the pointer is quite likely to be invalid in the context of a second process and trying to dereference the pointer from the second process may cause a crash.
这篇关于Python:c_char_p的多处理和数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!