气流文件传感器,用于感应本地驱动器上的文件 [英] Airflow File Sensor for sensing files on my local drive

查看:111
本文介绍了气流文件传感器,用于感应本地驱动器上的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人对FileSensor有任何想法吗?我在研究本地目录中的文件时遇到了它。代码如下:

does anybody have any idea on FileSensor ? I came through it while i was researching on sensing files on my local directory. The code is as follows:

  task= FileSensor(
    task_id="senseFile"
    filepath="etc/hosts",
    fs_conn_id='fs_local',
   _hook=self.hook,
    dag=self.dag,)

我也将conn_id和conn类型设置为File(路径),并设置了{'path':'mypath'},但即使我设置了不存在的路径或者如果文件不在指定路径中,则说明任务已完成并且dag成功。

I have also set my conn_id and conn type as File (path) and gave the {'path':'mypath'} but even though i set a non existing path or if the file isnt there in the specified path, the task is completed and the dag is successful. The FileSensor doesnt seem to sense files at all.

推荐答案

我发现社区对FileSenor的贡献有些不足,于是写了我自己的文件。

I found the community contributed FileSenor a little bit underwhelming so wrote my own.

我让它在服务器/调度程序运行所在的本地文件中正常工作,但是在使用网络路径时遇到了问题。

I got it working for files locally to where the server/scheduler was running however ran into problems when using network paths.

我发现网络路径的窍门是将网络驱动器安装到Linux Box。

The trick for network paths I found was to mount the network drive to my Linux Box.

这是我的DAG习惯于sensor_task >> proccess_task >> archive_task >>触发器重新运行

This is my DAG used to sensor_task >> proccess_task >> archive_task >> trigger rerun

注意:我们使用通过WebGUI输入的变量(sourcePath,filePattern和archivePath)

Note: We use variables (sourcePath, filePattern & archivePath) entered via the WebGUI

    from airflow import DAG
    from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator
    from datetime import datetime, timedelta
    from airflow.models import Variable

    default_args = {
        'owner': 'glsam',
        'depends_on_past': False,
        'start_date': datetime(2017, 6, 26),
        'provide_context': True,
        'retries': 100,
        'retry_delay': timedelta(seconds=30)
    }

    task_name = 'my_first_file_sensor_task'
    filepath = Variable.get("soucePath")
    filepattern = Variable.get("filePattern")
    archivepath = Variable.get("archivePath")

    dag = DAG(
        'task_name',
        default_args=default_args,
        schedule_interval=None,
        catchup=False,
        max_active_runs=1,
        concurrency=1)

    sensor_task = OmegaFileSensor(
        task_id=task_name,
        filepath=filepath,
        filepattern=filepattern,
        poke_interval=3,
        dag=dag)


    def process_file(**context):
        file_to_process = context['task_instance'].xcom_pull(
            key='file_name', task_ids=task_name)
        file = open(filepath + file_to_process, 'w')
        file.write('This is a test\n')
        file.write('of processing the file')
        file.close()


    proccess_task = PythonOperator(
        task_id='process_the_file', python_callable=process_file, dag=dag)

    archive_task = ArchiveFileOperator(
        task_id='archive_file',
        filepath=filepath,
        task_name=task_name,
        archivepath=archivepath,
        dag=dag)

    trigger = TriggerDagRunOperator(
        task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag)

    sensor_task >> proccess_task >> archive_task >> trigger

然后这是我的FileSenor

And then this is my FileSenor

    import os
    import re

    from datetime import datetime
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    from airflow.operators.sensors import BaseSensorOperator


    class ArchiveFileOperator(BaseOperator):
        @apply_defaults
        def __init__(self, filepath, archivepath, task_name, *args, **kwargs):
            super(ArchiveFileOperator, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.archivepath = archivepath
            self.task_name = task_name

        def execute(self, context):
            file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name')
            os.rename(self.filepath + file_name, self.archivepath + file_name)


    class OmegaFileSensor(BaseSensorOperator):
        @apply_defaults
        def __init__(self, filepath, filepattern, *args, **kwargs):
            super(OmegaFileSensor, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.filepattern = filepattern

        def poke(self, context):
            full_path = self.filepath
            file_pattern = re.compile(self.filepattern)

            directory = os.listdir(full_path)

            for files in directory:
                if not re.match(file_pattern, files):
                    # do nothing
                else:
                    context['task_instance'].xcom_push('file_name', files)
                    return True
            return False

    class OmegaPlugin(AirflowPlugin):
        name = "omega_plugin"
        operators = [OmegaFileSensor, ArchiveFileOperator]

这篇关于气流文件传感器,用于感应本地驱动器上的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆