airflow相关内容
我在从代码下载所有 Airflow 变量时遇到问题. 有机会从 UI 导出,但我还没有找到任何以编程方式进行导出的方法. 我发现只有 Variable.get('variable_name') 方法返回一个 Airflow 变量.没有获取 Airflow 变量列表的变体. 在源代码中搜索也无济于事.你知道一些简单的方法吗? 提前致谢. 解决方案 您可以使用 Airflo
..
代码: fromairflow.models import BaseOperator从airflow.utils.decorators导入apply_defaults从airflow.providers.google.cloud.hooks.gcs 导入GCSHook类 GCSUploadOperator(BaseOperator):@apply_defaults定义 __init__(自己,存
..
我制作了一个非常简单的 DAG,如下所示: from datetime import datetime从气流导入 DAG从airflow.operators.bash_operator 导入BashOperatorcleanup_command = "/home/ubuntu/airflow/dags/scripts/log_cleanup/log_cleanup.sh "DAG = DAG('
..
我正在尝试将 jinja 模板参数格式化为整数,以便我可以将其传递给需要 INT(可以是自定义或 PythonOperator)的运算符,但我无法做到. 请参阅下面的示例 DAG.我正在使用内置的 Jinja 过滤器 |int 但这不起作用 - 类型仍然是 我还是 Airflow 的新手,但根据我对 Jinja/Airflow 作品的了解,我认为这是不可能的.我看到了两种主要的解决方法
..
我正在安排气流作业.但是,为了验证我是否安排了正确的作业,我需要查看将来何时运行. Airflow 具有以下命令,可让我进行下一次运行.但是,这对于某些用例来说还不够.例如,我安排了每隔一个星期五运行一次作业.我如何验证这一点. airflow next_execution 有没有办法,我可以获得此 dag 运行的所有未来日期.或至少几个? 解决方案 虽然大多数进程使用 cro
..
我在 Centos 7 中使用 Airflow,使用 Python 3.7. 当我通过 BashOperator 运行 Bash 命令时,我遇到了以下问题: [2019-11-13 23:20:08,238] {taskinstance.py:1058} 错误 - [Errno 2] 没有那个文件或目录:'bash': 'bash'回溯(最近一次调用最后一次):文件“/home/airfl
..
我的 Windows 10 机器在 WSL 2 (Ubuntu-20.04) 中安装了 Airflow 1.10.11. 我有一个 BashOperator 任务,它在 Windows 上调用 .EXE(通过/mnt/c/... 或通过符号链接).任务失败.日志显示: [2020-12-16 18:34:11,833] {bash_operator.py:134} INFO - 临时脚本位
..
我很难找出如何为同一天(同一执行日)运行两次的同一个 dag 运行找到失败的任务. 考虑一个例子,当带有 dag_id=1 的 dag 在第一次运行时失败(由于任何原因,可以说连接超时)并且任务失败.当我们尝试查询它时,TaskInstance 表将包含失败任务的条目.太棒了!! 但是,如果我重新运行相同的 dag(注意 dag_id 仍然为 1),那么在最后一个任务中(它具有 ALL
..
我正在尝试将我的 Python 代码移至 Airflow.我有以下代码片段: s3_client = boto3.client('s3',region_name="us-west-2",aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key) 我正在尝试使用 Aiflow 的 s3 钩子和
..
使用 Airflow worker 和 webserver/scheduler 作为在 EC2 上的 Kubernetes Engine 上运行的 Docker 镜像 我们有一个具有 KubernetesPodOperator 的任务,它是资源密集型的,每 15 分钟运行一次. 在airflow-worker 中作为电子邮件收到这些错误 尝试 3 个中的 2 个例外:('连接中断:I
..
我需要在 sql 文件中访问 BigqueryOperator 传递的参数,但出现错误 ERROR - queryParameters 参数必须具有类型 不是 我正在使用以下代码: t2 = bigquery_operator.BigQueryOperator(task_id='bq_from_source_to_clean',sql='prepare.sql',use_legacy_sql=假
..
大家好,我正在使用 Airflow,我正在阅读这篇有用的教程.我正在寻求帮助以更好地了解 Admin->Connection 如何在 Conn Type: File (path) 方面工作.我想这种类型的连接是让我的操作员可以访问本地文件系统文件夹? 解决方案 由于您的评论,我才明白如何为本地文件配置连接,谢谢@desimetala.我会把它放在这里给下一个需要它的人. 如果本地路径
..
免责声明:我(还)不是 Airflow 的用户,今天才发现它,我开始探索它是否适合我的用例. 我有一个数据处理工作流,它是多个任务的顺序(非并行)执行.但是,某些任务需要在特定机器上运行.Airflow 可以管理这个吗?此用例的建议实施模型是什么? 谢谢. 解决方案 是的,您可以在 Airflow 中通过 队列.您可以将任务绑定到特定队列.然后对于机器上的每个工作人员,您可以将
..
我有运行 Databricks 笔记本系列的 Airflow dag. 现在我想要的是,如果笔记本出现故障?如何向此笔记本失败的用户发送邮件,但没有执行日期等详细信息. 有什么错误处理方法吗? 解决方案 Step1: 将 email_on_failure 设置为 False 并使用操作员的 >on_failure_callback.on_failure_callback 下面描
..
我正在尝试从 REST API 运行 DAG 并将一些参数传递给它.DAG 应该能够捕获参数并使用它.问题是我能够从 REST API 触发 DAG,但 DAG 无法捕获传递的参数.有没有办法做到这一点? 我正在从 REST API 触发 DAG,如下所示.它在 --conf 中传递参数 http://abcairflow.com:8090/admin/rest_api/api?api=t
..
我正在 docker 容器中运行气流,并且想要将我的 airflow.cfg 挂载为一个卷,以便我可以快速编辑配置,而无需重建我的图像或直接在正在运行的容器中进行编辑.我能够将我的 airflow.cfg 安装为一个卷,并且我的气流网络服务器在启动时成功地从中读取了配置.但是,当我在主机上编辑时,更改不会反映在 docker 容器内. docker 容器内 findmnt -Mairflow
..
所以我在气流中使用了这个非常好的 DAG,它基本上对二进制文件运行了几个分析步骤(作为气流插件实现).DAG 由 ftp 传感器触发,它只检查 ftp 服务器上是否有新文件,然后启动整个工作流程. 所以目前的工作流程是这样的:DAG 按定义触发 -> 传感器在 ftp 上等待新文件 -> 执行分析步骤 -> 工作流程结束. 我想要的是这样的:DAG 是触发器 -> 传感器等待 ftp
..
我通过 docker 通过这个镜像运行 Airflow apache/airflow:2.1.0请参阅此线程了解我遇到的初始错误. 目前我可以运行我以前存在的 DAG.但是,当我添加较新的 DAGS 时,日志文件中出现以下错误.我很确定这不是内存或计算的问题. *** 日志文件不存在:/opt/airflow/logs/my-task/my-task/2021-06-15T14:11:33
..
我正在使用 REST API 将参数传递给基于任务流的 Dag.看看这个论坛上提出的类似问题,下面似乎是访问传递参数的常用方法. #From 在模板字段或文件中:{{ dag_run.conf['key'] }}#或者当上下文可用时,例如在 PythonOperator 的可调用 python 中:上下文['dag_run'].conf['key'] 我试图获取上下文字典 @dag(defa
..
我正在使用 airflow.operators.sensors.ExternalTaskSensor 让一个 Dag 等待另一个. dag = DAG('dag2',默认参数={'所有者':'我','depends_on_past':错误,'开始日期':开始日期时间,'电子邮件':['me@example.com'],'email_on_failure':是的,'email_on_retry
..