如何以非阻塞方式链接期货?也就是说,如何在不阻碍的情况下将一个未来用作另一个未来的输入? [英] How to chain futures in a non-blocking manner? That is, how to use one future as an input in another future without blocking?

查看:55
本文介绍了如何以非阻塞方式链接期货?也就是说,如何在不阻碍的情况下将一个未来用作另一个未来的输入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用下面的示例,一旦future1完成(不阻止提交future3),future2如何使用future1的结果?

Using the below example, how can future2 use the result of future1 once future1 is complete (without blocking future3 from being submitted)?

from concurrent.futures import ProcessPoolExecutor
import time

def wait(seconds):
    time.sleep(seconds)
    return seconds

pool = ProcessPoolExecutor()

s = time.time()
future1 = pool.submit(wait, 5)
future2 = pool.submit(wait, future1.result())
future3 = pool.submit(wait, 10)

time_taken = time.time() - s
print(time_taken)

推荐答案

这可以通过在第一个操作完成后精心设计回调以提交第二个操作来实现.可悲的是,不可能将任意期货传递给pool.submit,因此需要额外的步骤将两个期货绑定在一起.

This is achievable by carefully crafting a callback to submit the second operation after the first one has completed. Sadly, it is not possible to pass an arbitrary future to pool.submit so an extra step is required to bind the two futures together.

这是一个可能的实现方式:

Here is a possible implementation:

import concurrent.futures

def copy_future_state(source, destination):
    if source.cancelled():
        destination.cancel()
    if not destination.set_running_or_notify_cancel():
        return
    exception = source.exception()
    if exception is not None:
        destination.set_exception(exception)
    else:
        result = source.result()
        destination.set_result(result)


def chain(pool, future, fn):
    result = concurrent.futures.Future()

    def callback(_):
        try:
            temp = pool.submit(fn, future.result())
            copy = lambda _: copy_future_state(temp, result)
            temp.add_done_callback(copy)
        except:
            result.cancel()
            raise

    future.add_done_callback(callback)
    return result

请注意,copy_future_state用法:

from concurrent.futures import ProcessPoolExecutor

def wait(seconds):
    time.sleep(seconds)
    return seconds

pool = ProcessPoolExecutor()
future1 = pool.submit(wait, 5)
future2 = chain(pool, future1, wait)
future3 = pool.submit(wait, 10)

这篇关于如何以非阻塞方式链接期货?也就是说,如何在不阻碍的情况下将一个未来用作另一个未来的输入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆