如何在python中设计异步管道模式 [英] How to design an async pipeline pattern in python

查看:144
本文介绍了如何在python中设计异步管道模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试设计一个异步管道,该管道可以轻松地建立数据处理管道.管道由几个功能组成.输入数据在流水线的一端输入,而在另一端输出.

I am trying to design an async pipeline that can easily make a data processing pipeline. The pipeline is composed of several functions. Input data goes in at one end of the pipeline and comes out at the other end.

我想通过以下方式设计管道:

I want to design the pipeline in a way that:

  1. 可以在管道中插入其他功能
  2. 可以弹出管道中已经存在的功能.

这是我想出的:

import asyncio

@asyncio.coroutine
def add(x):
    return x + 1

@asyncio.coroutine
def prod(x):
    return x * 2

@asyncio.coroutine
def power(x):
    return x ** 3

def connect(funcs):
    def wrapper(*args, **kwargs):
        data_out = yield from funcs[0](*args, **kwargs)
        for func in funcs[1:]:
            data_out = yield from func(data_out)
        return data_out
    return wrapper

pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)

这当然可以,但是问题是,如果我想在该管道中添加另一个函数(或从中弹出一个函数),则必须重新分解并重新连接每个函数.

This works, of course, but the problem is that if I want to add another function into (or pop out a function from) this pipeline, I have to disassemble and reconnect every function again.

我想知道是否有更好的方案或设计模式来创建这样的管道?

I would like to know if there is a better scheme or design pattern to create such a pipeline?

推荐答案

我不知道这是否是最好的方法,但这是我的解决方案.

I don't know if it is the best way to do it but here is my solution.

虽然我认为可以使用列表或字典来控制管道,但我发现使用列表或字典更容易且更有效地使用生成器.

While I think it's possible to control a pipeline using a list or a dictionary I found easier and more efficent to use a generator.

考虑以下生成器:

def controller():
    old = value = None
    while True:
        new = (yield value)
        value = old
        old = new

这基本上是一个元素的队列,它存储您发送的值,并在下一个 send (或 next )调用时释放它.

This is basically a one-element queue, it stores the value that you send it and releases it at the next call of send (or next).

示例:

>>> c = controller()
>>> next(c)           # prime the generator
>>> c.send(8)         # send a value
>>> next(c)           # pull the value from the generator
8

通过将管道中的每个协程与其控制器相关联,我们将拥有一个外部句柄,可用于推动每个协程的目标.我们只需要定义协程以使其在每个周期从控制器中提取新目标即可.

By associating every coroutine in the pipeline with its controller we will have an external handle that we can use to push the target of each one. We just need to define our coroutines in a way that they will pull the new target from our controller every cycle.

现在考虑以下协程:

def source(controller):
    while True:
        target = next(controller)
        print("source sending to", target.__name__) 
        yield (yield from target)

def add():
    return (yield) + 1

def prod():
    return (yield) * 2

源是一个协程,它不会返回,因此它不会在第一个循环后终止.其他协程是接收器",不需要控制器.您可以在管道中使用这些协程,如以下示例所示.我们最初设置了一个路由 source->添加,并在收到第一个结果后,将路由更改为 source->产品.

The source is a coroutine that does not return so that it will not terminate itself after the first cycle. The other coroutines are "sinks" and does not need a controller. You can use these coroutines in a pipeline as in the following example. We initially set up a route source --> add and after receiving the first result we change the route to source --> prod.

# create a controller for the source and prime it 
cont_source = controller()
next(cont_source)

# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()

# create a pipeline
cont_source.send(coro_add)

# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))

# change target of the source
cont_source.send(coro_prod)

# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))

输出:

source sending to add
add = 5
source sending to prod
prod = 16

这篇关于如何在python中设计异步管道模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆