在使用Airflow实现的工作流中,如何在DAG的任务中等待异步事件? [英] How to wait for an asynchronous event in a task of a DAG in a workflow implemented using Airflow?

查看:704
本文介绍了在使用Airflow实现的工作流中,如何在DAG的任务中等待异步事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Airflow实现的工作流程包含任务A,B,C和D。我希望工作流程在任务C中等待事件。在Airflow中,传感器用于通过轮询某种状态来检查某种条件,如果该条件为true,则将触发工作流程中的下一个任务。我的要求是避免轮询。



我配置了rest_api_plugin并使用了我要标记的插件task:start_evaluating_cyle已完成以继续工作流。







rest_api_plugin成功执行了任务,我可以看到该任务是使用flower运行的: / p>





但是在工作流中,任务:start_evaluating_cycle仍处于运行状态:







它带我到这个URL:



,当我确认时,工作流程会继续进行-这是我想要的,但是我需要标记其余API调用是否成功。



我的担忧是:


  1. 如何标记工作流中运行的任务成功使用

    rest_api_plugin?

  2. 是否可以使用气流管理员创建的URL通过从外部系统调用将
    a任务标记为成功?


解决方案

一个可能的解决方案是使用一个传感器,该传感器永远等待,直到外部手动将其状态设置为成功为止。



所以您会有某种虚拟传感器:

  class DummySensor(BaseSensorOperator):

def poke(自身,上下文):
返回False

已初始化像这样:

  task_c = DummySensor(
dag = dag,
task_id ='task_c',
interval = 600,#相当高,因为我们没有轮询任何东西,只是为了检查何时超时
timeout = 3600,#如果没有任何外部条件将状态设置为1小时,则任务将失败,因此任务D未运行

任务C启动时,它将仅位于RUNNING州。然后,您可以使用REST API插件在满足条件时将任务C的状态设置为SUCCESS。任务D和其他下游任务将开始执行。



缺点是虚拟传感器仍在等待任何操作时仍固定在工作槽上。


My workflow implemented using Airflow contains tasks A, B, C, and D. I want the workflow to wait at task C for an event. In Airflow sensors are used to check for some condition by polling for some state, if that condition is true then the next task in the workflow gets triggered. My requirement is to avoid polling. Here one answer mentions about a rest_api_plugin of airflow which creates rest_api endpoint to trigger airflow CLI - using this plugin I can trigger a task in the workflow. In my workflow, however, I want to implement a task that waits for a rest API call(async event) without polling, once it receives the rest API request the task gets triggered and the Airflow workflow resumes.

Reasons to avoid polling: it is inefficient and does not scale as per our requirements.

Update

I followed the suggestion mentioned in the answer by @Daniel Huang, I created a sensor which returns False. This sensor is implemented in task:start_evaluating_cycle, now this sensor task is not sensing anything but always returning False:

class WaitForEventSensor(BaseSensorOperator):

    def poke(self, context):
        return False

start_evaluating_cycle = WaitForEventSensor(
    task_id="start_evaluating_cycle",
    dag=dag,
    poke_interval=60*60 # any number will do here, because it not polling just returning false
)

I configured rest_api_plugin and using the plugin I am trying to mark the task:start_evaluating_cyle as complete to continue the workflow.

The rest_api_plugin executes the task successfully and I can see that the task was run using flower:

But in the workflow the task: start_evaluating_cycle is still in the running state:

rest_api_plugin is running the task independent of the workflow. How can I make rest_api_plugin to run the task inside the workflow - not independent of workflow.

However when I select task from airflow UI admin and mark success:

It takes me to this url:http://localhost:8080/admin/airflow/success?task_id=start_evaluating_cycle&dag_id=faculty_evaluation_workflow&upstream=false&downstream=false&future=false&past=false&execution_date=2017-11-26T06:48:54.297289&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8

and when I confirm, the workflow proceeds further - which is what I want, but I need to mark the success from a rest API call.

My concerns are:

  1. How to mark a task running inside a workflow as successful using
    rest_api_plugin?
  2. Is it possible to use the URL that airflow admin creates to mark a task as successful by calling it from an external system?

解决方案

One possible solution is using a sensor that waits forever until something external manually sets its state to success.

So you'd have some sort of dummy sensor:

class DummySensor(BaseSensorOperator):

    def poke(self, context):
        return False

Initialized like this:

task_c = DummySensor(
    dag=dag,
    task_id='task_c',
    interval=600,  # something fairly high since we're not polling for anything, just to check when to timeout
    timeout=3600,  # if nothing externally sets state to success in 1 hour, task will fail so task D does not run
)

When Task C starts, it will just sit in RUNNING state. Then you can use the REST API Plugin to set Task C's state to SUCCESS when the condition is met. Task D and other downstream tasks will then get kicked off.

The downside to this is the dummy sensor still holds onto a worker slot while it waits doing nothing.

这篇关于在使用Airflow实现的工作流中,如何在DAG的任务中等待异步事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆