使用来自 concurrent.futures 模块的 ThreadPoolExecutor 终止执行程序 [英] Terminate executor using ThreadPoolExecutor from concurrent.futures module

查看:68
本文介绍了使用来自 concurrent.futures 模块的 ThreadPoolExecutor 终止执行程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试根据长时间运行的请求返回的值终止线程池.一旦请求返回值的总和达到 MIN_REQUIRED_VALUE

I am trying to terminate a ThreadPool based on values returned from long running request. I wish to terminate the ThreadPool once the sum of the request return values reaches MIN_REQUIRED_VALUE

我确定问题在于我正在创建一个完整的期货列表,这些列表将始终需要解决.我不确定如何在不使用 ThreadPoolExecutor 创建列表的情况下执行请求

I am sure the problem is that I am creating a full list of futures which will always have to be resolved. I am not sure how to perform the requests without creating a list with ThreadPoolExecutor

我知道有几个与终止线程池相关的问题.我发现了类似的问题,但答案似乎没有处理返回值.

I know there has been a couple of questions related to terminating a thread pool. I have found similar questions but the answers don't seem to handle the return value.

类似的问题:

如果有更好的方法用另一个模块来做到这一点,那就太好了.

If there is a better way to do this with another module, that would be fine.

如有任何帮助,我们将不胜感激.

Any assistance would be much appreciated.

from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed

NUM_REQUESTS = 50
MIN_REQUIRED_VALUE = 30


def long_request(id):
    sleep(3)
    return {"data": {"value": 10}}


def check_results(results):
    total = 0
    for result in results:
        total += result["data"]["value"]

    return total


def main():
    futures = []
    responses = []

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Create Futures List
            futures.append(future)

        for future in as_completed(futures):
            responses.append(future.result())

            # Check minimum value reached
            total = check_results(responses)
            if total > MIN_REQUIRED_VALUE:
                executor.shutdown(wait=False)


if __name__ == "__main__":
    main()

推荐答案

我更改了代码,如果未达到 MIN_REQUIRED_VALUE,则仅附加带有结果的期货,并循环遍历所有待处理的期货,并在达到 MIN_REQUIRED_VALUE 时取消它们.

I changed the code around to append only futures with results if MIN_REQUIRED_VALUE not reached and loop through all pending futures and cancel them if MIN_REQUIRED_VALUE is reached.

你会注意到我添加了 num_requests 来检查提交的请求数量,结果在这种情况下正好是 6,这是预期的.

You can notice I added num_requests to check number of requests submitted and it turns out to be exactly 6 in this case which is expected.

如果有人有更好的方法来做到这一点,那就太好了.

If anyone has a better way to do this would be good to see.

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 1000
MIN_REQUIRED_VALUE = 50


def long_request(id):
    sleep(1)
    return {"data": {"value": 10}}


def check_results(results):
    total = 0
    for result in results:
        total += result["data"]["value"]

    return total


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            # --- Changed Logic Below ---
            total = check_results(responses)

            if total > MIN_REQUIRED_VALUE:
                for pending_future in futures:
                    pending_future.cancel()
            else:
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)

这篇关于使用来自 concurrent.futures 模块的 ThreadPoolExecutor 终止执行程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆