Python:deferToThread XMLRPC服务器-扭曲-Cherrypy? [英] Python: deferToThread XMLRPC Server - Twisted - Cherrypy?

查看:280
本文介绍了Python:deferToThread XMLRPC服务器-扭曲-Cherrypy?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题与我在这里提出的其他问题有关,主要是关于对内存中的大量数据进行排序.

基本上这就是我想要/拥有的:

扭曲的XMLRPC服务器正在运行.该服务器在内存中保留了几(32)个Foo类实例.每个Foo类都包含一个列表栏(它将包含数百万条记录).有一项服务可以从数据库中检索数据,并将其传递给XMLRPC服务器.数据基本上是一个字典,具有与每个Foo实例相对应的键,而值是一个字典列表,如下所示:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

然后向每个Foo实例传递与其键对应的值,并更新和排序Foo.bar字典.

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

问题是,当在XMLRPCController中使用大量记录(例如100K +)调用update函数时,它将停止响应我的getData调用,直到所有32个Foo实例都已完成process_data方法为止.我以为deferToThread可以工作,但我误解了问题出在哪里.

任何建议...我愿意使用其他东西,例如Cherrypy,如果它支持此必需的行为.


编辑

@Troy:这是反应堆的设置方式

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

就GIL而言,更改是否是可行的选择 sys.setcheckinterval() 值减小了一些,因此释放了对数据的锁定,以便可以读取它?

解决方案

使应用程序具有响应性的最简单方法是将CPU密集型处理分成较小的块,同时让扭曲的反应堆在两者之间运行.例如,通过调用reactor.callLater(0,process_next_chunk)前进到下一个块.自己有效地实现协作式多任务处理.

另一种方法是使用单独的流程来完成工作,然后您将从多个核心中受益.看一下安瓿瓶: https://launchpad.net/ampoule 它提供了类似于deferToThread的API. /p>

This question is related to others I have asked on here, mainly regarding sorting huge sets of data in memory.

Basically this is what I want / have:

Twisted XMLRPC server running. This server keeps several (32) instances of Foo class in memory. Each Foo class contains a list bar (which will contain several million records). There is a service that retrieves data from a database, and passes it to the XMLRPC server. The data is basically a dictionary, with keys corresponding to each Foo instance, and values are a list of dictionaries, like so:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

Each Foo instance is then passed the value corresponding to it's key, and the Foo.bar dictionaries are updated and sorted.

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

The problem is that when the update function is called in the XMLRPCController with a lot of records (say 100K +) it stops responding to my getData calls until all 32 Foo instances have completed the process_data method. I thought deferToThread would work, but I think I am misunderstanding where the problem is.

Any suggestions... I am open to using something else, like Cherrypy if it supports this required behavior.


EDIT

@Troy: This is how the reactor is set up

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

As far as GIL, would it be a viable option to change sys.setcheckinterval() value to something smaller, so the lock on the data is released so it can be read?

解决方案

The easiest way to get the app to be responsive is to break up the CPU-intensive processing in smaller chunks, while letting the twisted reactor run in between. For example by calling reactor.callLater(0, process_next_chunk) to advance to next chunk. Effectively implementing cooperative multitasking by yourself.

Another way would be to use separate processes to do the work, then you will benefit from multiple cores. Take a look at Ampoule: https://launchpad.net/ampoule It provides an API similar to deferToThread.

这篇关于Python:deferToThread XMLRPC服务器-扭曲-Cherrypy?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆