airflow相关内容

导出所有气流变量

我在从代码下载所有 Airflow 变量时遇到问题. 有机会从 UI 导出,但我还没有找到任何以编程方式进行导出的方法. 我发现只有 Variable.get('variable_name') 方法返回一个 Airflow 变量.没有获取 Airflow 变量列表的变体. 在源代码中搜索也无济于事.你知道一些简单的方法吗? 提前致谢. 解决方案 您可以使用 Airflo ..
发布时间:2021-10-26 18:02:29 Python

在 Airflow 的操作员参数中将 jinja 模板格式化为 INT

我正在尝试将 jinja 模板参数格式化为整数,以便我可以将其传递给需要 INT(可以是自定义或 PythonOperator)的运算符,但我无法做到. 请参阅下面的示例 DAG.我正在使用内置的 Jinja 过滤器 |int 但这不起作用 - 类型仍然是 我还是 Airflow 的新手,但根据我对 Jinja/Airflow 作品的了解,我认为这是不可能的.我看到了两种主要的解决方法 ..
发布时间:2021-10-26 18:02:19 其他开发

气流 - 如何获得所有未来的运行日期

我正在安排气流作业.但是,为了验证我是否安排了正确的作业,我需要查看将来何时运行. Airflow 具有以下命令,可让我进行下一次运行.但是,这对于某些用例来说还不够.例如,我安排了每隔一个星期五运行一次作业.我如何验证这一点. airflow next_execution 有没有办法,我可以获得此 dag 运行的所有未来日期.或至少几个? 解决方案 虽然大多数进程使用 cro ..
发布时间:2021-10-26 18:02:16 其他开发

Airflow BashOperator 找不到 Bash

我在 Centos 7 中使用 Airflow,使用 Python 3.7. 当我通过 BashOperator 运行 Bash 命令时,我遇到了以下问题: [2019-11-13 23:20:08,238] {taskinstance.py:1058} 错误 - [Errno 2] 没有那个文件或目录:'bash': 'bash'回溯(最近一次调用最后一次):文件“/home/airfl ..
发布时间:2021-10-26 18:02:13 Python

如何找到Airflow中失败的上游任务数量?

我很难找出如何为同一天(同一执行日)运行两次的同一个 dag 运行找到失败的任务. 考虑一个例子,当带有 dag_id=1 的 dag 在第一次运行时失败(由于任何原因,可以说连接超时)并且任务失败.当我们尝试查询它时,TaskInstance 表将包含失败任务的条目.太棒了!! 但是,如果我重新运行相同的 dag(注意 dag_id 仍然为 1),那么在最后一个任务中(它具有 ALL ..
发布时间:2021-10-26 18:02:07 其他开发

气流连接类型文件(路径)

大家好,我正在使用 Airflow,我正在阅读这篇有用的教程.我正在寻求帮助以更好地了解 Admin->Connection 如何在 Conn Type: File (path) 方面工作.我想这种类型的连接是让我的操作员可以访问本地文件系统文件夹? 解决方案 由于您的评论,我才明白如何为本地文件配置连接,谢谢@desimetala.我会把它放在这里给下一个需要它的人. 如果本地路径 ..
发布时间:2021-10-26 18:01:56 服务器开发

如何使用 Airflow 在不同的机器上运行一个工作流程的不同任务?

免责声明:我(还)不是 Airflow 的用户,今天才发现它,我开始探索它是否适合我的用例. 我有一个数据处理工作流,它是多个任务的顺序(非并行)执行.但是,某些任务需要在特定机器上运行.Airflow 可以管理这个吗?此用例的建议实施模型是什么? 谢谢. 解决方案 是的,您可以在 Airflow 中通过 队列.您可以将任务绑定到特定队列.然后对于机器上的每个工作人员,您可以将 ..
发布时间:2021-10-26 18:01:53 其他开发

如何处理 AIRFLOW (DatabricksSubmitRunOperator) 的错误

我有运行 Databricks 笔记本系列的 Airflow dag. 现在我想要的是,如果笔记本出现故障?如何向此笔记本失败的用户发送邮件,但没有执行日期等详细信息. 有什么错误处理方法吗? 解决方案 Step1: 将 email_on_failure 设置为 False 并使用操作员的 >on_failure_callback.on_failure_callback 下面描 ..
发布时间:2021-10-26 18:01:50 其他开发

如何在Airflow中的调用DAG中捕获传递的--conf参数

我正在尝试从 REST API 运行 DAG 并将一些参数传递给它.DAG 应该能够捕获参数并使用它.问题是我能够从 REST API 触发 DAG,但 DAG 无法捕获传递的参数.有没有办法做到这一点? 我正在从 REST API 触发 DAG,如下所示.它在 --conf 中传递参数 http://abcairflow.com:8090/admin/rest_api/api?api=t ..
发布时间:2021-10-26 18:01:49 其他开发

气流如何在docker容器中安装airflow.cfg

我正在 docker 容器中运行气流,并且想要将我的 airflow.cfg 挂载为一个卷,以便我可以快速编辑配置,而无需重建我的图像或直接在正在运行的容器中进行编辑.我能够将我的 airflow.cfg 安装为一个卷,并且我的气流网络服务器在启动时成功地从中读取了配置.但是,当我在主机上编辑时,更改不会反映在 docker 容器内. docker 容器内 findmnt -Mairflow ..
发布时间:2021-10-26 18:01:45 其他开发

为每个文件运行气流 DAG

所以我在气流中使用了这个非常好的 DAG,它基本上对二进制文件运行了几个分析步骤(作为气流插件实现).DAG 由 ftp 传感器触发,它只检查 ftp 服务器上是否有新文件,然后启动整个工作流程. 所以目前的工作流程是这样的:DAG 按定义触发 -> 传感器在 ftp 上等待新文件 -> 执行分析步骤 -> 工作流程结束. 我想要的是这样的:DAG 是触发器 -> 传感器等待 ftp ..
发布时间:2021-10-26 18:01:42 Python

如何修复“无法从工作人员获取日志文件.不支持的 URL 协议"Airflow DAG 日志中的错误?

我通过 docker 通过这个镜像运行 Airflow apache/airflow:2.1.0请参阅此线程了解我遇到的初始错误. 目前我可以运行我以前存在的 DAG.但是,当我添加较新的 DAGS 时,日志文件中出现以下错误.我很确定这不是内存或计算的问题. *** 日志文件不存在:/opt/airflow/logs/my-task/my-task/2021-06-15T14:11:33 ..
发布时间:2021-10-26 18:01:39 其他开发

在 Airflow 2.0 中使用 Taskflow API 传递争论

我正在使用 REST API 将参数传递给基于任务流的 Dag.看看这个论坛上提出的类似问题,下面似乎是访问传递参数的常用方法. #From 在模板字段或文件中:{{ dag_run.conf['key'] }}#或者当上下文可用时,例如在 PythonOperator 的可调用 python 中:上下文['dag_run'].conf['key'] 我试图获取上下文字典 @dag(defa ..
发布时间:2021-10-26 18:01:36 其他开发

Airflow ExternalTask​​Sensor 执行超时

我正在使用 airflow.operators.sensors.ExternalTask​​Sensor 让一个 Dag 等待另一个. dag = DAG('dag2',默认参数={'所有者':'我','depends_on_past':错误,'开始日期':开始日期时间,'电子邮件':['me@example.com'],'email_on_failure':是的,'email_on_retry ..
发布时间:2021-10-26 18:01:33 其他开发