apache-airflow相关内容

使用TriggerDagRunOperator多次运行另一个DAG

我有一个DAG(DAG1),我在其中复制一堆文件。然后,我想为复制的每个文件启动另一个DAG(DAG2)。由于每次运行DAG1时复制的文件数量会有所不同,因此我希望循环遍历这些文件,并使用适当的参数调用DAG2。 例如: with DAG( 'DAG1', description="copy files over", schedule_interval= ..
发布时间:2022-06-28 15:38:07 其他开发

使用 Apache Airflow 将文件从一个 Google Cloud Storage Bucket 复制到另一个

问题:我想将文件从 Google Cloud Storage Bucket 中的文件夹(例如 Bucket1 中的 Folder1)复制到另一个 Bucket(例如 Bucket2).我找不到任何用于 Google Cloud Storage 复制文件的 Airflow Operator. 解决方案 我刚刚在2小时前上传的contrib中发现了一个新的算子:https://github.c ..

作业不通过使用 RabbitMQ 运行 celery 的 Airflow 执行

下面是我使用的配置 [核心]# 气流的主文件夹,默认为 ~/airflowairflow_home =/root/气流# 气流管道所在的文件夹,很可能是# 代码库中的子文件夹dags_folder =/root/airflow/dags#气流应该存储其日志文件的文件夹.这个位置base_log_folder =/root/airflow/logs# 可以为日志备份提供一个 S3 位置# 对于 S ..
发布时间:2022-01-11 17:26:15 其他开发

用于运行 shell 脚本的 Airflow Dataproc 操作符

是否有任何直接的方法可以将 shell 脚本运行到 dataproc 集群中.目前我可以通过 pysparkoperator 运行 shell(它调用另一个 python 文件,然后这个 python 文件调用 shell 脚本).我搜索了很多链接,但到目前为止还没有找到任何直接的方法. 如果有人能告诉我最简单的方法,那对我真的很有帮助. 解决方案 PIG job with sh o ..
发布时间:2021-10-26 18:05:01 Python

保证一些算子会在同一个airflow worker上执行

我有一个 DAG 从云存储下载 csv 文件 通过 https 将 csv 文件上传到第三方 我正在执行的气流集群默认使用 CeleryExecutor,所以我担心在我扩大工作人员数量的某个时候,这些任务可能会在不同的工作人员上执行.例如.工作人员 A 进行下载,工作人员 B 尝试上传,但没有找到文件(因为它在工作人员 A 上) 是否有可能以某种方式保证下载和上传操作符都将在同 ..
发布时间:2021-10-26 18:02:48 其他开发

如何使用 Airflow 在不同的机器上运行一个工作流程的不同任务?

免责声明:我(还)不是 Airflow 的用户,今天才发现它,我开始探索它是否适合我的用例. 我有一个数据处理工作流,它是多个任务的顺序(非并行)执行.但是,某些任务需要在特定机器上运行.Airflow 可以管理这个吗?此用例的建议实施模型是什么? 谢谢. 解决方案 是的,您可以在 Airflow 中通过 队列.您可以将任务绑定到特定队列.然后对于机器上的每个工作人员,您可以将 ..
发布时间:2021-10-26 18:01:53 其他开发

Airflow:让用户使用 ldap 登录

有谁知道我将如何从气流中获取当前用户?我们将后端启用到 airflow/contrib/auth/backends/ldap_auth.py,因此用户通过该身份验证登录,我想知道如何让当前用户点击某些内容(自定义将我们作为插件查看). 解决方案 您可以通过调用{{ current_user.user.username }} 或 {{ current_user.user }} 来获取 在您的 ..
发布时间:2021-10-26 18:01:13 Python

如何以 Unix 用户身份运行 Apache Airflow DAG

我使用 root 帐户在我的集群上安装了 Apache Airflow.我知道这是不好的做法,但这只是测试环境.我创建了一个简单的 DAG: from 气流导入 DAG从airflow.operators.bash_operator 导入BashOperator从日期时间导入日期时间,时间增量dag = DAG('create_directory', description='简单的创建目录工作 ..
发布时间:2021-10-26 18:00:18 其他开发

如何触发每日在当地时间午夜而不是 UTC 时间午夜运行的 DAG

我在 UTC+4 时区,所以当 Airflow 触发夜间 ETL 时,这里已经是凌晨 4:00.我如何告诉 Airflow 在 ds-1 天 20:00 触发 ds 天的运行,但 ds=ds? 根据文档,强烈建议将所有服务器保持在 UTC 上,这就是我寻找应用程序级解决方案的原因. 编辑:一个hacky的解决方案是将它定义为每天晚上20:00运行,所以是“前一天",然后在工作.但这在 ..
发布时间:2021-10-26 17:59:22 其他开发

请求中的气流 CROSSSLOT 密钥不会使用 AWS ElastiCache 散列到相同的槽错误

我在 AWS ECS 上运行 apache-airflow 1.8.1,我有一个 AWS ElastiCache 集群(redis 3.2.4)运行 2 个分片/2 个启用多可用区的节点(集群 redis 引擎).我已经验证气流可以毫无问题地访问集群的主机/端口. 这是日志: Thu Jul 20 01:39:21 UTC 2017 - 检查 redis(端点:redis://xxxxxx ..
发布时间:2021-07-05 20:07:38 其他开发

如何在 EmailOperator 任务的文件名中添加模板变量?(空气流动)

我似乎无法让它发挥作用. 我正在尝试每天发送一个给定的文件,其名称类似于“file_{{ds_nodash}}.csv". 问题是我似乎无法将此名称添加为文件名,因为它似乎无法使用.在电子邮件的文本或主题中完美无缺,而不是在名称上. 以 dag 为例: local_file = 'file-{{ds_nodash}}.csv'send_stats_csv = EmailOper ..
发布时间:2021-06-26 19:29:31 Python

手动DAG运行设置单个任务状态

我有一个没有时间表的DAG(根据需要手动运行).它有很多任务.有时我想通过手动将任务状态更改为SUCCESS来“跳过"一些初始任务.更改手动执行的DAG的任务状态失败,这似乎是由于解析了execute_date的错误. 还有另一种方法可以单独设置手动执行的DAG的任务状态吗? 示例在下面运行.该任务的执行日期为01-13T17:27:13.130427,我相信毫秒数未正确解析. ..
发布时间:2020-08-21 19:35:10 其他开发

是否可以通过向Airflow中的操作员添加更多的cpus来提高处理速度?

在airflow.cfg中有一个名为[operators]的部分,其中default_cpus设置为1,并且default_ram和default_disk都设置为512. 我想了解,如果不增加这些参数,是否可以提高处理速度. 解决方案 我查看了源代码,这些设置对所有操作员都可用,但是操作员和任何执行者都从未使用过它们. 因此,我回顾了一下历史,并回顾了引入了这些设置的提交,它 ..
发布时间:2020-08-21 19:35:03 其他开发

通过Airflow运行.EXE和Powershell任务

我们的系统基本上只是与MS SQL Server一起运行C#和Powershell应用程序的Windows Server.我们有一个内部WorkflowManagement解决方案,该解决方案能够运行执行EXE/BAT/PS1甚至调用DLL函数的任务. 现在,我正在评估Apache Airflow是否对我们来说是更好的解决方案.到目前为止,我的幼稚计划是在Linux机器上运行气流调度程序,然 ..
发布时间:2020-08-21 19:33:36 其他开发

如何在Airflow中实施轮询?

我想使用Airflow实施数据流,这些数据流定期轮询外部系统(ftp服务器等),检查是否符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是Airflow的新手,并且了解到在这种情况下可以使用Sensor,并且实际上我设法编写了一个在运行“ airflow test”时可以正常工作的传感器。但是对于传感器的poke_interval与DAG调度的关系,我有些困惑。如何为用例定义这些设置?还 ..
发布时间:2020-06-02 21:39:26 其他开发

气流计划程序未获取DAG运行

我正在设置气流,以使网络服务器在一台计算机上运行,​​而调度程序在另一台计算机上运行。两者共享同一个MySQL Metastore数据库。这两个实例都出现在日志中,没有任何错误,但是调度程序没有拾取任何通过Web UI手动触发DAG创建的DAG运行。 MysQL中的dag_run表显示了一些条目,它们都处于运行状态: mysql>从dag_run选择*; + ---- + --- ..