Apache Airflow-触发/时间表DAG在完成时重新运行(文件传感器) [英] Apache Airflow - trigger/schedule DAG rerun on completion (File Sensor)

查看:674
本文介绍了Apache Airflow-触发/时间表DAG在完成时重新运行(文件传感器)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

早上好。

我也在尝试设置DAG

I'm trying to setup a DAG too


  1. 监视/感知文件是否命中网络文件夹

  2. 处理文件

  3. 存档文件

使用在线教程和stackoverflow,我能够提出以下成功实现目标的DAG和Operator,但是我希望重新安排DAG或重新运行它

Using the tutorials online and stackoverflow I have been able to come up with the following DAG and Operator that successfully achieves the objectives, however I would like the DAG to be rescheduled or rerun on completion so it starts watching/sensing for another file.

我试图设置变量 max_active_runs:1 ,然后然后 schedule_interval:timedelta(seconds = 5)可以重新安排DAG的工作,但是开始排队任务并锁定文件。

I attempted to set a variable max_active_runs:1 and then a schedule_interval: timedelta(seconds=5) this yes reschedules the DAG but starts queuing task and locks the file.

有任何想法欢迎我在archive_task之后如何重新运行DAG?

Any ideas welcome on how I could rerun the DAG after the archive_task?

谢谢

DAG代码

from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable

default_args = {
    'owner': 'glsam',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'provide_context': True,
    'retries': 100,
    'retry_delay': timedelta(seconds=30),
    'max_active_runs': 1,
    'schedule_interval': timedelta(seconds=5),
}

dag = DAG('test_sensing_for_a_file', default_args=default_args)

filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")

sensor_task = OmegaFileSensor(
    task_id='file_sensor_task',
    filepath=filepath,
    filepattern=filepattern,
    poke_interval=3,
    dag=dag)


def process_file(**context):
    file_to_process = context['task_instance'].xcom_pull(
        key='file_name', task_ids='file_sensor_task')
    file = open(filepath + file_to_process, 'w')
    file.write('This is a test\n')
    file.write('of processing the file')
    file.close()


proccess_task = PythonOperator(
    task_id='process_the_file', 
    python_callable=process_file,
    provide_context=True,
    dag=dag
)

archive_task = ArchiveFileOperator(
    task_id='archive_file',
    filepath=filepath,
    archivepath=archivepath,
    dag=dag)

sensor_task >> proccess_task >> archive_task

文件传感器操作符

    import os
    import re

    from datetime import datetime
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    from airflow.operators.sensors import BaseSensorOperator


    class ArchiveFileOperator(BaseOperator):
        @apply_defaults
        def __init__(self, filepath, archivepath, *args, **kwargs):
            super(ArchiveFileOperator, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.archivepath = archivepath

        def execute(self, context):
            file_name = context['task_instance'].xcom_pull(
                'file_sensor_task', key='file_name')
            os.rename(self.filepath + file_name, self.archivepath + file_name)


    class OmegaFileSensor(BaseSensorOperator):
        @apply_defaults
        def __init__(self, filepath, filepattern, *args, **kwargs):
            super(OmegaFileSensor, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.filepattern = filepattern

        def poke(self, context):
            full_path = self.filepath
            file_pattern = re.compile(self.filepattern)

            directory = os.listdir(full_path)

            for files in directory:
                if re.match(file_pattern, files):
                    context['task_instance'].xcom_push('file_name', files)
                    return True
            return False


    class OmegaPlugin(AirflowPlugin):
        name = "omega_plugin"
        operators = [OmegaFileSensor, ArchiveFileOperator]


推荐答案

设置 sc hedule_interval = None 并使用 BashOperator 中的 airflow trigger_dag 命令在完成时启动下一次执行前一个。

Set schedule_interval=None and use airflow trigger_dag command from BashOperator to launch next execution at the completion of the previous one.

trigger_next = BashOperator(task_id="trigger_next", 
           bash_command="airflow trigger_dag 'your_dag_id'", dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger_next

您可以使用相同的 airflow trigger_dag手动开始首次运行命令,然后执行 trigger_next 任务将自动触发下一个任务。我们已经在生产中使用了数月,并且运行良好。

You can start your first run manually with the same airflow trigger_dag command and then trigger_next task will automatically trigger the next one. We use this in production for many months now and and it runs perfectly.

这篇关于Apache Airflow-触发/时间表DAG在完成时重新运行(文件传感器)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆