队列为Python扭曲的角度经纪人远程调用? [英] Queue remote calls to a Python Twisted perspective broker?

查看:151
本文介绍了队列为Python扭曲的角度经纪人远程调用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

扭曲的强度(对于Python)是它的异步架构(我认为)。我已经写了需要通过透视经纪人请求的图像处理服务器。它只要我给它不到几百​​张图像时的伟大工程。然而,有时它被数以百计的图像几乎在同一时间飙升。因为它试图处理它们同时在服务器崩溃。

The strength of Twisted (for python) is its asynchronous framework (I think). I've written an image processing server that takes requests via Perspective Broker. It works great as long as I feed it less than a couple hundred images at a time. However, sometimes it gets spiked with hundreds of images at virtually the same time. Because it tries to process them all concurrently the server crashes.

,因为我想在同一时间排队的remote_calls在服务器上,以便它只能处理约100图像的解决方案。看起来这可能是一些扭曲已经这样做,但我似乎无法找到它。关于如何启动实施任何想法?在正确的方向的一个点?谢谢!

As a solution I'd like to queue up the remote_calls on the server so that it only processes ~100 images at a time. It seems like this might be something that Twisted already does, but I can't seem to find it. Any ideas on how to start implementing this? A point in the right direction? Thanks!

推荐答案

一个现成的选项,可能与这是 twisted.internet.defer.DeferredSemaphore 帮助。这是正常的(计数)的异步版本信号量,你可能已经知道,如果你做了多少线程编程。

One ready-made option that might help with this is twisted.internet.defer.DeferredSemaphore. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.

A(计数)信号是很多像一个互斥体(锁)。但是,在一个互斥只能使用一次获得直到一个相应的释放,一(计数)信号可以被配置为允许收购的任意(但是指定)号码成功是必需的任何相应的释放前

A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.

下面的 DeferredSemaphore 使用的一个例子跑十异步操作,但同时运行最多三个​​:

Here's an example of using DeferredSemaphore to run ten asynchronous operations, but to run at most three of them at once:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore 也有明确的获得发布方法,但运行方法是如此方便,它几乎总是你想要什么。它调用获取方法,该方法返回一个递延。到第一个递延,它增加了它调用您传递的功能(与任何位置或关键字参数一起)的回调。如果该函数返回一个递延,然后到第二个递延的回调被添加了调用发布方法。

DeferredSemaphore also has explicit acquire and release methods, but the run method is so convenient it's almost always what you want. It calls the acquire method, which returns a Deferred. To that first Deferred, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns a Deferred, then to that second Deferred a callback is added which calls the release method.

同步的情况下被处理为好,通过立即调用发布。错误也被处理,让他们传播,但在确保必要的发布做是为了离开 DeferredSemaphore 一致的状态。传递给运行函数的结果(或者结果的递延返回)成为的结果递延运行返回

The synchronous case is handled as well, by immediately calling release. Errors are also handled, by allowing them to propagate but making sure the necessary release is done to leave the DeferredSemaphore in a consistent state. The result of the function passed to run (or the result of the Deferred it returns) becomes the result of the Deferred returned by run.

另一种可能的方法可能是基于 DeferredQueue 合作 DeferredQueue 大多是像一个正常的队列,但它的 GET 方法返回一个递延。如果碰巧在队列中没有项目在调用的时候,递延将不会触发直到项目将被添加。

Another possible approach might be based on DeferredQueue and cooperate. DeferredQueue is mostly like a normal queue, but its get method returns a Deferred. If there happen to be no items in the queue at the time of the call, the Deferred will not fire until an item is added.

下面是一个例子:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

请注意,该异步工人功能是一样的,从第一个例子之一。不过,这一次,也有一个工人函数明确与<拉动就业出来的 DeferredQueue 和处理它们code>异步(加入异步作为回调到递延返回通过 GET )。在工人发生器由推动合作,它迭代后各一次递延它产生火灾。主循环中,然后,开始这三个工人发电机,使得三个作业将在进度在任何给定的时间。

Note that the async worker function is the same as the one from the first example. However, this time, there's also a worker function which is explicitly pulling jobs out of the DeferredQueue and processing them with async (by adding async as a callback to the Deferred returned by get). The worker generator is driven by cooperate, which iterates it once after each Deferred it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.

该方法涉及多一点code比 DeferredSemaphore 的办法,但有一定的好处可能是有趣的。首先,合作函数返回一个拥有像暂停 CooperativeTask 实例C>,恢复,和一对夫妇等。此外,分配给同一合作者所有作业将<青霉>合作的与调度彼此,以免过载事件循环(这是什么赋予该API的名称)。在 DeferredQueue 的一面,它也可以设置多少个项目正在等待处理的限制,这样你就可以完全避免超载您的服务器(例如,如果您的图像处理器卡住并停止完成任务)。如果code调用处理队列溢出异常,您可以以此为pressure尝试停止接受新的就业机会(也许他们分流到另一台服务器,或提示管理员)。做类似的事情与 DeferredSemaphore 是有点麻烦,因为没有办法来限制多少任务正在等待能够获得信号量。

This approach involves a bit more code than the DeferredSemaphore approach, but has some benefits which may be interesting. First, cooperate returns a CooperativeTask instance which has useful methods like pause, resume, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On the DeferredQueue side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code calling put handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things with DeferredSemaphore is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.

这篇关于队列为Python扭曲的角度经纪人远程调用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆