使用多处理从顶部和尾部迭代列表 [英] Iterate over list from leading and trailing with multiprocessing

查看:238
本文介绍了使用多处理从顶部和尾部迭代列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想迭代一个列表2函数使用多处理一个函数迭代main_list从领先和其他从尾随,我想要这个函数每次迭代样本列表( g )将元素放在主列表中,直到其中一个在列表中找到一个重复,然后我想要终止两个进程并返回看到的元素。



预期第一个进程返回:

  ['a','b','c','d' ','f'] 

第二次返回:

  ['l','k','j','i','h','g'] 



这是我的代码返回错误:

 从多进程导入进程,经理

manager = Manager()
d = manager.list()

#Fn定义和这样的
def a main_path,g,l = []):
for i in g:
l.append(i)
print'a'
如果i在main_path中:
return l
main_path.append(i)

def b(main_path,g,l = []):
for i in g:
l.append i)
print'b'
如果i在main_path中:
return l
main_path.append(i)

g = ['a' b','c','d','e','f','g','h','i','j','k','l']
g2 = g [:: - 1]

p1 = Process(target = a,args =(d,g))
p2 = Process(target = b,args =(d,g2))
p1.start()
p2.start()

Traceback

  a 
过程流程2:
Traceback(最近最后一次调用):
在_bootstrap中的文件/usr/lib/python2.7/multiprocessing/process.py,第258行
self.run()
文件/usr/lib/python2.7/multiprocessing/process.py,第114行,在运行
self._target(* self._args,** self._kwargs)
文件 /home/bluebird/Desktop/persiantext.py,第17行,在
中if i in main_path:
文件< string>,第2行,在__contains__
文件 _callmethod中的$ usr / lib / python2.7 / multiprocessing / managers.py,行755
self._connect()
文件/usr/lib/python2.7/multiprocessing/managers.py ,行742,在_connect
conn = self._Client(self._token.address,authkey = self._authkey)
文件/usr/lib/python2.7/multiprocessing/connection.py,行169,在客户
b
c = SocketClient(地址)
文件/usr/lib/python2.7/multiprocessing/connection.py,行304,在SocketClient
s .connect(address)
文件/usr/lib/python2.7/socket.py,第224行,在meth
中返回getattr(self._sock,name)(* args)
错误:[Errno 2]没有这样的文件或目录
进程process-3:
回溯(最近最后调用):
文件/usr/lib/python2.7/multiprocessing/ process.py,行258,在_bootstrap
self.run()
文件/usr/lib/python2.7/multiprocessing/process.py,第114行,在运行
self._target(* self._args,** self._kwargs)
文件/home/bluebird/Desktop/persiantext.py,第27行,在b
如果i在main_path:
文件< string>,第2行,在__contains__
文件/usr/lib/python2.7/multiprocessing/managers.py,行755,在_callmethod
self._connect
文件/usr/lib/python2.7/multiprocessing/managers.py,行742,在_connect
conn = self._Client(self._token.address,authkey = self._authkey)
文件/usr/lib/python2.7/multiprocessing/connection.py,第169行,在客户端
c = SocketClient(地址)
文件/usr/lib/python2.7/ multiprocessing / connection.py,行304,在SocketClient
s.connect(地址)
文件/usr/lib/python2.7/socket.py,第224行,在meth
return getattr(self._sock,name)(* args)
错误:[Errno 2]没有这样的文件或目录

请注意,我不知道如何终止这两个进程后,其中一个找到一个重复的元素!

解决方案

代码中还有其他问题,但由于我已经在其他问题上解释过了,我不会在这里介绍。



新的问题是,你不加入你的子进程。在你的线程版本,这不是一个问题,只是因为你的主线程意外地有一个块永远在结束之前。但在这里,你没有那个,所以主进程到达脚本的结尾,而后台进程仍在运行。



当发生这种情况时,它不是完全定义你的代码将会做什么。 * 但是基本上,你正在销毁manager对象,关闭了管理器服务器,而后台进程仍在使用它,所以他们将引发异常下一次他们尝试访问受管对象。



解决方案是添加 p1.join() code> p2.join()到脚本的结尾。



但这只会让你回到相同的情况作为你的线程代码(除非在结束时永远阻塞)。你仍然有代码完全序列化,并有一个大的竞争条件,等等。






好奇为什么会发生这种情况:



在脚本的末尾,你的模块的全局变量超出了范围。 ** 你只有对经理和过程对象的引用,这些对象被垃圾收集,并且它们的析构函数被调用。



对于一个经理对象,析构函数关闭服务器。



对于一个流程对象,我不完全确定,但我认为析构函数没有或中断它)。相反,有一个atexit函数,它运行在所有析构函数之后,加入任何仍在运行的进程。 ***



首先经理离开,然后主要过程开始等待孩子们完成;下次每次尝试访问受管对象时,它会失败并退出。一旦所有人都这样做,主要过程就完成等待和退出。






* multiprocessing 3.2中的更改和3.4中的关闭更改使事情变得更干净,因此如果我们不在讨论2.7,那么通常会发生但不总是,这是在一个特定平台上的一个特定实现中发生的。



**这实际上不是由2.7保证,所有模块的全局变量并不总是发生。但在这个特殊的简单情况下,我漂亮确定总是工作方式,至少在CPython,虽然我不想尝试解释为什么。



***这绝对是如何使用线程的,至少在UNIX上的CPython 2.7上...再次,这不是所有记录在2.x,所以你只能通过阅读源或实验平台/实施/版本,对你很重要的,你可以只告诉...我不想跟踪这个通过源,除非可能有一些令人困惑或有趣的找到。


I want to iterate over a list with 2 function using multiprocessing one function iterate over the main_list from leading and other from trailing, I want this function each time that iterates over the sample list (g) put the element in main list till one of them find a duplicate in list then I want the terminate both processes and return the seen elements.

I expect that the first process return :

['a', 'b', 'c', 'd', 'e', 'f']

And the second return :

['l', 'k', 'j', 'i', 'h', 'g']

this is my code that returns an Error:

from multiprocessing import Process, Manager

manager = Manager()
d = manager.list()

# Fn definitions and such
def a(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'a'
    if i in main_path:
      return l
    main_path.append(i)

def b(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'b'
    if i in main_path:
      return l
    main_path.append(i)

g=['a','b','c','d','e','f','g','h','i','j','k','l']
g2=g[::-1]

p1 = Process(target=a, args=(d,g))
p2 = Process(target=b, args=(d,g2))
p1.start()
p2.start()

And this is the Traceback:

a
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/bluebird/Desktop/persiantext.py", line 17, in a
    if i in main_path:
  File "<string>", line 2, in __contains__
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
    self._connect()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 169, in Client
b
    c = SocketClient(address)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 304, in SocketClient
    s.connect(address)
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/bluebird/Desktop/persiantext.py", line 27, in b
    if i in main_path:
  File "<string>", line 2, in __contains__
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethod
    self._connect()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 169, in Client
    c = SocketClient(address)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 304, in SocketClient
    s.connect(address)
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory

Note that i have not any idea that how terminate both processes after that one of them find a duplicated element!!

解决方案

There are all kinds of other problems in your code, but since I already explained them on your other question, I won't get into them here.

The new problem is that you're not joining your child processes. In your threaded version, this wasn't an issue just because your main thread accidentally had a "block forever" before the end. But here, you don't have that, so the main process reaches the end of the script while the background processes are still running.

When this happens, it's not entirely defined what your code will do.* But basically, you're destroying the manager object, which shuts down the manager server while the background processes are still using it, so they're going to raise exceptions the next time they try to access a managed object.

The solution is to add p1.join() and p2.join() to the end of your script.

But that really only gets you back to the same situation as your threaded code (except not blocking forever at the end). You've still got code that's completely serialized, and a big race condition, and so on.


If you're curious why this happens:

At the end of the script, all of your module's globals go out of scope.** Since those variables are the only reference you have to the manager and process objects, those objects get garbage-collected, and their destructors get called.

For a manager object, the destructor shuts down the server.

For a process object, I'm not entirely sure, but I think the destructor does nothing (rather than join it and/or interrupt it). Instead, there's an atexit function, that runs after all of the destructors, that joins any still-running processes.***

So, first the manager goes away, then the main process starts waiting for the children to finish; the next time each one tries to access a managed object, it fails and exits. Once all of them do that, the main process finishes waiting and exits.


* The multiprocessing changes in 3.2 and the shutdown changes in 3.4 make things a lot cleaner, so if we weren't talking about 2.7, there would be less "here's what usually happens but not always" and "here's what happens in one particular implementation on one particular platform".

** This isn't actually guaranteed by 2.7, and garbage-collecting all of the modules' globals doesn't always happen. But in this particular simple case, I'm pretty sure it will always work this way, at least in CPython, although I don't want to try to explain why.

*** That's definitely how it works with threads, at least on CPython 2.7 on Unix… again, this isn't at all documented in 2.x, so you can only tell by reading the source or experimenting on the platforms/implementations/versions that matter to you… And I don't want to track this through the source unless there's likely to be something puzzling or interesting to find.

这篇关于使用多处理从顶部和尾部迭代列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆