具有多处理工人的扭曲的网络客户端? [英] Twisted network client with multiprocessing workers?

查看:68
本文介绍了具有多处理工人的扭曲的网络客户端?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我有一个使用Twisted + Stomper作为STOMP客户端的应用程序,该应用程序将工作分配到多处理程序中.

当我只是使用python脚本来启动它时,这似乎可以正常工作,(简化)如下所示:

# stompclient.py

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()

当打包打包以进行部署时,我想我会利用扭曲的脚本并从tac文件中运行它.

这是我看起来非常相似的tac文件:

# stompclient.tac

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination

application = service.Application('myapp')

service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)

为了便于说明,我已经折叠或更改了一些细节;希望它们不是问题的本质.例如,我的应用程序有一个插件系统,该池由一个单独的方法初始化,然后使用pool.apply_async()将我的插件的process()方法之一传递给该池.

因此,如果我运行脚本(stompclient.py),一切都会按预期进行.

如果我在非守护程序模式(-n)中运行Twist,它似乎也可以正常工作:

twistd -noy stompclient.tac

但是,当我在守护程序模式下运行时,它不会起作用:

twistd -oy stompclient.tac

该应用程序似乎启动正常,但是当它尝试分派工作时,它只是挂起. 挂起"是指似乎从未要求子进程执行任何操作,而父进程(称为pool.apply_async())只是坐在那里等待响应返回.

我确定我在Twisted +多处理方面做的很蠢,但是我真的希望有人可以向我解释我的方法的缺陷.

提前谢谢!

解决方案

由于您的工作调用与非工作调用之间的区别只是"-n"选项,因此问题很可能是由守护进程("-n"阻止发生).

在POSIX上,守护进程涉及的步骤之一是派生并具有父出口.其中,其结果是使您的代码在与对.tac文件进行评估的过程不同的过程中运行.这也重新排列了.tac文件中启动的进程的子级/父级关系-就像您的多进程进程池一样.

多处理池的进程从您启动的扭曲进程的父进程开始.但是,当该进程作为守护进程的一部分退出时,它们的父级将成为系统初始化进程.这可能会引起一些问题,尽管可能不是您描述的问题.可能还有其他类似的低层实现细节,这些细节通常可使多处理模块正常工作,但会被守护进程破坏.

幸运的是,避免这种奇怪的相互作用应该很简单. Twisted的服务API允许您在守护程序完成后运行代码.如果使用这些API,则可以将多处理模块的进程池的初始化延迟到守护进程之后,并希望避免该问题.这是一个可能看起来像的例子:

from twisted.application.service import Service

class MultiprocessingService(Service):
    def startService(self):
        self.pool = multiprocessing.Pool(processes=processes)

MultiprocessingService().setServiceParent(application)

现在,分别地,您还可能会遇到与清理多处理模块的子进程有关的问题,或者可能还与通过Twisted的进程创建API Reactor.spawnProcess创建的进程有关的问题.这是因为正确处理子进程的一部分通常涉及处理SIGCHLD信号.但是,在这方面,扭曲和多处理将无法合作,因此其中一个将收到所有退出儿童的通知,而另一个将永远不会得到通知.如果您根本不使用Twisted的API创建子进程,那么这对您来说可以-但您可能需要检查以确保多处理模块尝试安装的任何信号处理程序实际上成功"并且没有得到替换为Twisted自己的处理程序.

So, I've got an application that uses Twisted + Stomper as a STOMP client which farms out work to a multiprocessing.Pool of workers.

This appears to work ok when I just use a python script to fire this up, which (simplified) looks something like this:

# stompclient.py

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()

As this gets packaged for deployment, I thought I would take advantage of the twistd script and run this from a tac file.

Here's my very-similar-looking tac file:

# stompclient.tac

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination

application = service.Application('myapp')

service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)

For the sake of illustration, I have collapsed or changed a few details; hopefully they were not the essence of the problem. For example, my app has a plugin system, the pool is initialized by a separate method, and then work is delegated to the pool using pool.apply_async() passing one of my plugin's process() methods.

So, if I run the script (stompclient.py), everything works as expected.

It also appears to work OK if I run twist in non-daemon mode (-n):

twistd -noy stompclient.tac

however, it does not work when I run in daemon mode:

twistd -oy stompclient.tac

The application appears to start up OK, but when it attempts to fork off work, it just hangs. By "hangs", I mean that it appears that the child process is never asked to do anything and the parent (that called pool.apply_async()) just sits there waiting for the response to return.

I'm sure that I'm doing something stupid with Twisted + multiprocessing, but I'm really hoping that someone can explain to my the flaw in my approach.

Thanks in advance!

解决方案

Since the difference between your working invocation and your non-working invocation is only the "-n" option, it seems most likely that the problem is caused by the daemonization process (which "-n" prevents from happening).

On POSIX, one of the steps involved in daemonization is forking and having the parent exit. Among of things, this has the consequence of having your code run in a different process than the one in which the .tac file was evaluated. This also re-arranges the child/parent relationship of processes which were started in the .tac file - as your pool of multiprocessing processes were.

The multiprocessing pool's processes start off with a parent of the twistd process you start. However, when that process exits as part of daemonization, their parent becomes the system init process. This may cause some problems, although probably not the hanging problem you described. There are probably other similarly low-level implementation details which normally allow the multiprocessing module to work but which are disrupted by the daemonization process.

Fortunately, avoiding this strange interaction should be straightforward. Twisted's service APIs allow you to run code after daemonization has completed. If you use these APIs, then you can delay the initialization of the multiprocessing module's process pool until after daemonization and hopefully avoid the problem. Here's an example of what that might look like:

from twisted.application.service import Service

class MultiprocessingService(Service):
    def startService(self):
        self.pool = multiprocessing.Pool(processes=processes)

MultiprocessingService().setServiceParent(application)

Now, separately, you may also run into problems relating to clean up of the multiprocessing module's child processes, or possibly issues with processes created with Twisted's process creation API, reactor.spawnProcess. This is because part of dealing with child processes correctly generally involves handling the SIGCHLD signal. Twisted and multiprocessing aren't going to be cooperating in this regard, though, so one of them is going to get notified of all children exiting and the other will never be notified. If you don't use Twisted's API for creating child processes at all, then this may be okay for you - but you might want to check to make sure any signal handler the multiprocessing module tries to install actually "wins" and doesn't get replaced by Twisted's own handler.

这篇关于具有多处理工人的扭曲的网络客户端?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆