Prefect 中的循环任务 [英] Looping tasks in Prefect

查看:24
本文介绍了Prefect 中的循环任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想一次又一次地循环执行任务,直到达到某个条件,然后再继续工作流的其余部分.

I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.

到目前为止我所拥有的是:

What I have so far is this:

# Loop task
class MyLoop(Task):
    def run(self):
        loop_res = prefect.context.get("task_loop_result", 1)
        print (loop_res)
        if loop_res >= 10:
            return loop_res
        raise LOOP(result=loop_res+1)

但据我所知,这不适用于多项任务.有没有办法进一步返回并一次循环执行多个任务?

But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?

推荐答案

解决方案是简单地创建一个任务,该任务本身创建一个具有一个或多个参数的新流并调用 flow.run().例如:

The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:

class MultipleTaskLoop(Task):
    def run(self):
        # Get previous value
        loop_res = prefect.context.get("task_loop_result", 1)
        
        # Create subflow
        with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
            x = Parameter('x', default = 1)
            loop1 = print_loop()
            add = add_value(x)
            loop2 = print_loop()
            loop1.set_downstream(add)
            add.set_downstream(loop2)

        # Run subflow and extract result
        subflow_res = flow.run(parameters={'x': loop_res})
        new_res = subflow_res.result[add]._result.value

        # Loop
        if new_res >= 10:
            return new_res
        raise LOOP(result=new_res)

其中 print_loop 只是打印loop";在输出中,add_value 将接收到的值加一.

where print_loop simply prints "loop" in the output and add_value adds one to the value it receives.

这篇关于Prefect 中的循环任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆