如何有效地执行许多任务“稍后”在Python? [英] How to efficiently do many tasks a "little later" in Python?

查看:101
本文介绍了如何有效地执行许多任务“稍后”在Python?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个进程,需要执行一堆动作稍后(通常在10-60秒之后)。问题是,这些后来的动作可以是很多(1000秒),因此使用线程每个任务是不可行的。我知道存在 gevent eventlet ,但问题之一是该流程使用 zeromq < a>为了沟通,所以我需要一些集成(eventlet已经有它)。

I have a process, that needs to perform a bunch of actions "later" (after 10-60 seconds usually). The problem is that those "later" actions can be a lot (1000s), so using a Thread per task is not viable. I know for the existence of tools like gevent and eventlet, but one of the problem is that the process uses zeromq for communication so I would need some integration (eventlet already has it).

我想知道是什么是我的选择?所以,欢迎建议,在图书馆的行(如果你使用任何提到,请分享你的经验),技术( Python的协程支持,使用一个线程休眠一段时间并检查队列),如何使用zeromq的轮询或eventloop来做这项工作,或其他。

What I'm wondering is What are my options? So, suggestions are welcome, in the lines of libraries (if you've used any of the mentioned please share your experiences), techniques (Python's "coroutine" support, use one thread that sleeps for a while and checks a queue), how to make use of zeromq's poll or eventloop to do the job, or something else.

推荐答案

考虑使用优先级队列与一个或多个工作线程来服务任务。主线程可以向队列添加工作,其中时间戳最早应该被服务。工作线程弹出工作队列,睡眠直到达到优先级值的时间,做工作,然后从队列中弹出另一个项目。

consider using a priority queue with one or more worker threads to service the tasks. The main thread can add work to the queue, with a timestamp of the soonest it should be serviced. Worker threads pop work off the queue, sleep until the time of priority value is reached, do the work, and then pop another item off the queue.

fleshed答案。 mklauber是一个好点。如果有一个机会,当你有新的,更紧急的工作,所有的工人可能正在睡觉,然后一个 queue.PriorityQueue 不是真正的解决方案,虽然优先级队列仍然是使用的技术,可以从 heapq 模块获得。相反,我们将使用不同的同步原语;一个条件变量,在python中拼写 threading.Condition

How about a more fleshed out answer. mklauber makes a good point. If there's a chance all of your workers might be sleeping when you have new, more urgent work, then a queue.PriorityQueue isn't really the solution, although a "priority queue" is still the technique to use, which is available from the heapq module. Instead, we'll make use of a different synchronization primitive; a condition variable, which in python is spelled threading.Condition.

这种方法相当简单,在堆上查看,如果工作是当前的,弹出它然后做这项工作。如果有工作,但它是计划在未来,只是等待条件,直到那时,或如果没有工作,永远睡觉。

The approach is fairly simple, peek on the heap, and if the work is current, pop it off and do that work. If there was work, but it's scheduled into the future, just wait on the condition until then, or if there's no work at all, sleep forever.

生产者做它公平分担工作;每当它添加新的工作,它通知条件,所以如果有睡觉的工人,他们会醒来,并重新检查队列的新工作。

The producer does it's fair share of the work; every time it adds new work, it notifies the condition, so if there are sleeping workers, they'll wake up and recheck the queue for newer work.

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)

这篇关于如何有效地执行许多任务“稍后”在Python?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆