任务队列大小有限的Python ThreadPool [英] Python ThreadPool with limited task queue size

查看:125
本文介绍了任务队列大小有限的Python ThreadPool的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题如下:我有一个multiprocessing.pool.ThreadPool对象,其中包含worker_count个工作程序和一个主pqueue,我可以从中向池中馈送任务.

流程如下:有一个主循环,该循环从pqueue获取level级别的项目,然后使用apply_async将其提交到池中.处理该项目时,它会生成level + 1的项目.问题在于该池接受所有任务,并按提交顺序处理它们.

更确切地说,正在处理level 0项,每个项生成100个level 1项,这些项立即从pqueue中检索并添加到池中,每个level 1项生成100个level 2提交到池中的项目等等,并且这些项目以BFS方式进行处理.

我需要告诉池接受不超过worker_count个项目,以便有机会从pqueue中检索更高级别的内容,以便以DFS方式处理项目.

我当前使用的解决方案是:对于每个提交的任务,将AsyncResult对象保存在asyncres_list列表中,然后在从pqueue检索项目之前,我删除已处理的项目(如果有),请检查如果asyncres_list的长度每0.5秒小于池中的线程数,并且类似的话,同一时间只能同时处理thread_number个项目.

我想知道是否有一种更干净的方法来实现此行为,并且我似乎在文档中找不到一些参数来限制可以提交给池的最大任务数.

解决方案

ThreadPool是完成常见任务的简单工具.如果要自己管理队列,则可以获取DFS行为;您可以直接在顶部的threadingqueue模块上实现必要的功能.

要防止调度下一个根任务,直到完成当前任务派生的所有任务(类似"DFS"的顺序),您可以

该代码示例摘自 模块文档.

每个级别的任务都会产生multiplicity个直接子任务,这些子任务会产生自己的子任务,直到达到maxlevel.

None用于向工人发出信号,要求他们辞职. t.join()用于等待线程正常退出.如果主线程由于任何原因被中断,则除非有其他非守护线程(除非您希望提供SIGINT hanlder,以指示工作人员在Ctrl+C上正常退出而不是死亡),否则守护进程线程将被杀死. /p>

queue.LifoQueue()用于获得后进先出"顺序(由于有多个线程,因此近似).

未设置maxsize的原因是,否则工作人员可能会陷入僵局-无论如何,您都必须将任务放在某个地方. worker_count后台线程正在运行,而与任务队列无关.

My problem is the following: I have a multiprocessing.pool.ThreadPool object with worker_count workers and a main pqueue from which I feed tasks to the pool.

The flow is as follows: There is a main loop that gets an item of level level from pqueue and submits it tot the pool using apply_async. When the item is processed, it generates items of level + 1. The problem is that the pool accepts all tasks and processes them in the order they were submitted.

More precisely, what is happening is that the level 0 items are processed and each generates 100 level 1 items that are retrieved immediately from pqueue and added to the pool, each level 1 item produces 100 level 2 items that are submitted to the pool, and so on, and the items are processed in an BFS manner.

I need to tell the pool to not accept more than worker_count items in order to give a chance of higher level to be retrieved from pqueue in order to process items in a DFS manner.

The current solution I came with is: for each submitted task, save the AsyncResult object in a asyncres_list list, and before retrieving items from pqueue I remove the items that were processed (if any), check if the length of the asyncres_list is lower than the number of threads in the pool every 0.5 seconds, and like that only thread_number items will be processed at the same time.

I am wondering if there is a cleaner way to achieve this behaviour and I can't seem to find in the documentation some parameters to limit the maximum number of tasks that can be submitted to a pool.

解决方案

ThreadPool is a simple tool for a common task. If you want to manage the queue yourself, to get DFS behavior; you could implement the necessary functionality on top threading and queue modules directly.

To prevent scheduling the next root task until all tasks spawned by the current task are done ("DFS"-like order), you could use Queue.join():

#!/usr/bin/env python3
import queue
import random
import threading
import time

def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
    for task in iter(q.get, None):  # blocking get until None is received
        try:
            if len(task) < maxlevel:
                for i in range(multiplicity):
                    q.put(task + str(i))  # schedule the next level
            time.sleep(random.random())  # emulate some work
            with lock:
                print(task)
        finally:
            q.task_done()

worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
           for _ in range(worker_count)]
for t in threads:
    t.start()

for task in "01234":  # populate the first level
    q.put(task)
    q.join()  # block until all spawned tasks are done
for _ in threads:  # signal workers to quit
    q.put(None)
for t in threads:  # wait until workers exit
    t.join()

The code example is derived from the example in the queue module documentation.

The task at each level spawns multiplicity direct child tasks that spawn their own subtasks until maxlevel is reached.

None is used to signal the workers that they should quit. t.join() is used to wait until threads exit gracefully. If the main thread is interrupted for any reason then the daemon threads are killed unless there are other non-daemon threads (you might want to provide SIGINT hanlder, to signal the workers to exit gracefully on Ctrl+C instead of just dying).

queue.LifoQueue() is used, to get "Last In First Out" order (it is approximate due to multiple threads).

The maxsize is not set because otherwise the workers may deadlock--you have to put the task somewhere anyway. worker_count background threads are running regardless of the task queue.

这篇关于任务队列大小有限的Python ThreadPool的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆