Python入队任务并按顺序获取结果 [英] Python enqueue tasks and get results in order

查看:82
本文介绍了Python入队任务并按顺序获取结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望有多个线程执行任务,但我也希望按顺序获得结果.

I would like to have multiple threads performing tasks, but I would also like to get the results in order.

举一个简单的示例代码:

Take a simple sample code:

from threading import Thread
import queue
import time


class TaskQueue(queue.Queue):
    def __init__(self, num_workers=1):
        queue.Queue.__init__(self)
        self.num_workers = num_workers
        self.start_workers()

    def add_task(self, task, *args, **kwargs):
        args = args or ()
        kwargs = kwargs or {}
        self.put((task, args, kwargs))

    def start_workers(self):
        for i in range(self.num_workers):
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    def worker(self):
        while True:
            ##tupl = self.get()  # REMOVED
            item, args, kwargs = self.get()
            item(*args, **kwargs)
            self.task_done()


def task(task_num, sleep_time):
    time.sleep(sleep_time)
    print("Task #{} sleeping {}".format(task_num, sleep_time))

q = TaskQueue(num_workers=2)

for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
    q.add_task(task, t, s)

q.join()  # block until all tasks are done
print("All Done!!")

我添加任务的地方,有关联的任务编号,每个任务执行时间不同(睡眠).

Where I am adding tasks, with associated task number and each taking different execution time (sleeps).

我有三个问题.

1) 我什至没有得到所有的输出(甚至没有考虑顺序).目前我只是得到输出:

1) I am not even getting all the outputs (without even considering the order). Currently I am just getting the output:

   Task #4 sleeping 6
   Task #2 sleeping 8
   Task #6 sleeping 4
   Task #8 sleeping 2

似乎我没有接到奇怪的任务,也许一切都来自其他工人.为什么会这样,我怎样才能得到它们?

Seems I am not getting the odd tasks, maybe everything coming from the other worker. Why is that and how can I get them?

  1. 之后程序就挂了.我假设因为工作人员阻塞,直到它从队列中得到一些东西.如果队列为空,则永远等待.我怎样才能更新它以便它退出或点击全部完成!!"?一旦队列中没有更多任务.

如何让它按顺序打印任务?基本上我希望结果是:

How can I have it print the tasks in order? Basically I want the results to be:

Task #1 sleeping 9
Task #2 sleeping 8
Task #3 sleeping 7
Task #4 sleeping 6
Task #5 sleeping 5
Task #6 sleeping 4
Task #7 sleeping 3
Task #8 sleeping 2
Task #9 sleeping 1

还假设任务结果很大,任务本身的数量很多,因此我真的不想将它们全部保存在内存中然后进行一些排序.我应该知道添加到队列中的任务数量,并且只想利用这些任务首先打印什么.暂时在内存中保存一些是可以接受的.我知道在当前示例中,您必须先保存一些,因为第一个任务花费的时间最长.您可以假设每个任务的执行时间(或在这种情况下为睡眠)是随机的.

Also assume the task results are quite large and the number of tasks itself is a lot, thus I dont really want to have them all saved in memory then do some ordering. I should know the number of tasks added into the queue, and would just like to utilize those on what to print first. Saving some in memory temporarily is acceptable. I know in the current example you kind of have to save some first, since the first task takes the longest. You can assume that the execution time (or sleep in this case) will be random per tasks.

目前使用 Python 3.7

Currently using Python 3.7

---编辑---

从上面的代码中删除 tupl = self.get() 解决了问题 #1 和 #2.因此只剩下问题#3.欢迎任何想法/解决方案

Removing tupl = self.get() from the above code solved question #1 and #2. Thus only question #3 is remaining. Any ideas/solution are welcome

推荐答案

这是我的问题的答案.使用两个队列(用于作业和结果).从结果队列中提取答案并保存在字典中.它们按顺序打印出来并相应地删除.

Here's the answer to my question. Using two queues (for the jobs and for the results). Answers are extracted from the results queue and saved in a dictionary. They are printed out in order and deleted accordingly.

2 个 worker 需要 23 秒,其中 1 个 worker 或仅同步执行需要 45 秒:

Taking 23 seconds for 2 workers, where 1 worker or just synchronous execution takes 45 seconds:

from threading import Thread
import queue
import time
import datetime

class TaskQueue():
    def __init__(self, num_workers=1):
        self.num_workers = num_workers
        self.total_num_jobs = 0
        self.jobs_completed = 0
        self.answers_sent = 0
        self.jobs = queue.Queue()
        self.results = queue.Queue()
        self.start_workers()

    def add_task(self, task, *args, **kwargs):
        args = args or ()
        kwargs = kwargs or {}
        self.total_num_jobs += 1
        self.jobs.put((task, args, kwargs))

    def start_workers(self):
        for i in range(self.num_workers):
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    def worker(self):
        while True:
            item, args, kwargs = self.jobs.get()
            item(*args, **kwargs)
            self.jobs_completed += 1
            self.jobs.task_done()

    def get_answers(self):
        while self.answers_sent < self.total_num_jobs or self.jobs_completed == 0:
            yield self.results.get()
            self.answers_sent += 1
            self.results.task_done()


def task(task_num, sleep_time, q):
    time.sleep(sleep_time)
    ans = "Task #{} sleeping {}".format(task_num, sleep_time)
    q.put((task_num, ans))


if __name__ == "__main__":
    start = datetime.datetime.now()
    h = TaskQueue(num_workers=2)
    q = h.results
    answers = {}
    curr_task = 1

    for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
        h.add_task(task, t, s, q)

    for task_num, ans in h.get_answers():
        answers[task_num] = ans
        if curr_task in answers:
            print(answers[curr_task])
            del answers[curr_task]
            curr_task += 1

    # Print remaining items (if any)
    for k, v in sorted(answers.items()):
        print(v)

    h.jobs.join()  # block until all tasks are done

    print("All done")
    print("Total Execution: {}".format(datetime.datetime.now() - start))

这篇关于Python入队任务并按顺序获取结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆