气流文件传感器,用于感应本地驱动器上的文件 [英] Airflow File Sensor for sensing files on my local drive
问题描述
有人对FileSensor有任何想法吗?我在研究本地目录中的文件时遇到了它。代码如下:
does anybody have any idea on FileSensor ? I came through it while i was researching on sensing files on my local directory. The code is as follows:
task= FileSensor(
task_id="senseFile"
filepath="etc/hosts",
fs_conn_id='fs_local',
_hook=self.hook,
dag=self.dag,)
我也将conn_id和conn类型设置为File(路径),并设置了{'path':'mypath'},但即使我设置了不存在的路径或者如果文件不在指定路径中,则说明任务已完成并且dag成功。
I have also set my conn_id and conn type as File (path) and gave the {'path':'mypath'} but even though i set a non existing path or if the file isnt there in the specified path, the task is completed and the dag is successful. The FileSensor doesnt seem to sense files at all.
推荐答案
我发现社区对FileSenor的贡献有些不足,于是写了我自己的文件。
I found the community contributed FileSenor a little bit underwhelming so wrote my own.
我让它在服务器/调度程序运行所在的本地文件中正常工作,但是在使用网络路径时遇到了问题。
I got it working for files locally to where the server/scheduler was running however ran into problems when using network paths.
我发现网络路径的窍门是将网络驱动器安装到Linux Box。
The trick for network paths I found was to mount the network drive to my Linux Box.
这是我的DAG习惯于sensor_task >> proccess_task >> archive_task >>触发器重新运行
This is my DAG used to sensor_task >> proccess_task >> archive_task >> trigger rerun
注意:我们使用通过WebGUI输入的变量(sourcePath,filePattern和archivePath)
Note: We use variables (sourcePath, filePattern & archivePath) entered via the WebGUI
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime(2017, 6, 26),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30)
}
task_name = 'my_first_file_sensor_task'
filepath = Variable.get("soucePath")
filepattern = Variable.get("filePattern")
archivepath = Variable.get("archivePath")
dag = DAG(
'task_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
concurrency=1)
sensor_task = OmegaFileSensor(
task_id=task_name,
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids=task_name)
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file', python_callable=process_file, dag=dag)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
task_name=task_name,
archivepath=archivepath,
dag=dag)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger
然后这是我的FileSenor
And then this is my FileSenor
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, task_name, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
self.task_name = task_name
def execute(self, context):
file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_pattern, files):
# do nothing
else:
context['task_instance'].xcom_push('file_name', files)
return True
return False
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]
这篇关于气流文件传感器,用于感应本地驱动器上的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!