队列为Python扭曲的角度经纪人远程调用? [英] Queue remote calls to a Python Twisted perspective broker?
问题描述
扭曲的强度(对于Python)是它的异步架构(我认为)。我已经写了需要通过透视经纪人请求的图像处理服务器。它只要我给它不到几百张图像时的伟大工程。然而,有时它被数以百计的图像几乎在同一时间飙升。因为它试图处理它们同时在服务器崩溃。
The strength of Twisted (for python) is its asynchronous framework (I think). I've written an image processing server that takes requests via Perspective Broker. It works great as long as I feed it less than a couple hundred images at a time. However, sometimes it gets spiked with hundreds of images at virtually the same time. Because it tries to process them all concurrently the server crashes.
,因为我想在同一时间排队的remote_calls在服务器上,以便它只能处理约100图像的解决方案。看起来这可能是一些扭曲已经这样做,但我似乎无法找到它。关于如何启动实施任何想法?在正确的方向的一个点?谢谢!
As a solution I'd like to queue up the remote_calls on the server so that it only processes ~100 images at a time. It seems like this might be something that Twisted already does, but I can't seem to find it. Any ideas on how to start implementing this? A point in the right direction? Thanks!
推荐答案
一个现成的选项,可能与这是 twisted.internet.defer.DeferredSemaphore
帮助。这是正常的(计数)的异步版本信号量,你可能已经知道,如果你做了多少线程编程。
One ready-made option that might help with this is twisted.internet.defer.DeferredSemaphore
. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.
A(计数)信号是很多像一个互斥体(锁)。但是,在一个互斥只能使用一次获得直到一个相应的释放,一(计数)信号可以被配置为允许收购的任意(但是指定)号码成功是必需的任何相应的释放前
A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.
下面的 DeferredSemaphore
使用的一个例子跑十异步操作,但同时运行最多三个:
Here's an example of using DeferredSemaphore
to run ten asynchronous operations, but to run at most three of them at once:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
也有明确的获得
和发布
方法,但运行
方法是如此方便,它几乎总是你想要什么。它调用获取
方法,该方法返回一个递延
。到第一个递延
,它增加了它调用您传递的功能(与任何位置或关键字参数一起)的回调。如果该函数返回一个递延
,然后到第二个递延
的回调被添加了调用发布
方法。
DeferredSemaphore
also has explicit acquire
and release
methods, but the run
method is so convenient it's almost always what you want. It calls the acquire
method, which returns a Deferred
. To that first Deferred
, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns a Deferred
, then to that second Deferred
a callback is added which calls the release
method.
同步的情况下被处理为好,通过立即调用发布
。错误也被处理,让他们传播,但在确保必要的发布
做是为了离开 DeferredSemaphore
一致的状态。传递给运行
函数的结果(或者结果的递延
返回)成为的结果递延
按运行返回
。
The synchronous case is handled as well, by immediately calling release
. Errors are also handled, by allowing them to propagate but making sure the necessary release
is done to leave the DeferredSemaphore
in a consistent state. The result of the function passed to run
(or the result of the Deferred
it returns) becomes the result of the Deferred
returned by run
.
另一种可能的方法可能是基于 DeferredQueue
和合作
。 DeferredQueue
大多是像一个正常的队列,但它的 GET
方法返回一个递延
。如果碰巧在队列中没有项目在调用的时候,递延
将不会触发直到项目将被添加。
Another possible approach might be based on DeferredQueue
and cooperate
. DeferredQueue
is mostly like a normal queue, but its get
method returns a Deferred
. If there happen to be no items in the queue at the time of the call, the Deferred
will not fire until an item is added.
下面是一个例子:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
请注意,该异步
工人功能是一样的,从第一个例子之一。不过,这一次,也有一个工人
函数明确与<拉动就业出来的 DeferredQueue
和处理它们code>异步(加入异步
作为回调到递延
返回通过 GET
)。在工人
发生器由推动合作
,它迭代后各一次递延
它产生火灾。主循环中,然后,开始这三个工人发电机,使得三个作业将在进度在任何给定的时间。
Note that the async
worker function is the same as the one from the first example. However, this time, there's also a worker
function which is explicitly pulling jobs out of the DeferredQueue
and processing them with async
(by adding async
as a callback to the Deferred
returned by get
). The worker
generator is driven by cooperate
, which iterates it once after each Deferred
it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.
该方法涉及多一点code比 DeferredSemaphore
的办法,但有一定的好处可能是有趣的。首先,合作
函数返回一个拥有像暂停$ C $有用的方法
CooperativeTask
实例C>,恢复
,和一对夫妇等。此外,分配给同一合作者所有作业将<青霉>合作的与调度彼此,以免过载事件循环(这是什么赋予该API的名称)。在 DeferredQueue
的一面,它也可以设置多少个项目正在等待处理的限制,这样你就可以完全避免超载您的服务器(例如,如果您的图像处理器卡住并停止完成任务)。如果code调用把
处理队列溢出异常,您可以以此为pressure尝试停止接受新的就业机会(也许他们分流到另一台服务器,或提示管理员)。做类似的事情与 DeferredSemaphore
是有点麻烦,因为没有办法来限制多少任务正在等待能够获得信号量。
This approach involves a bit more code than the DeferredSemaphore
approach, but has some benefits which may be interesting. First, cooperate
returns a CooperativeTask
instance which has useful methods like pause
, resume
, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On the DeferredQueue
side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code calling put
handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things with DeferredSemaphore
is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.
这篇关于队列为Python扭曲的角度经纪人远程调用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!