airflow相关内容

Airflow-从DAG上下文回调解析任务ID

最初使用dag callback(on_failure_callback和on_success_callback)时,我以为success或fail会在dag结束时触发success或fail状态(正如DAG中定义的那样)。 但是之后似乎每隔task instance而不是dag run实例化一次,所以如果一个DAG有N个任务,那么它会触发这些回调N次。 我正在尝试捕获task-id,因此将 ..
发布时间:2022-03-01 21:00:13 其他开发

如何将DBT与AWS管理气流配合使用?

希望你一切顺利。 我想检查是否有人在AWS MWAA Airflow中启动并运行DBT。 我尝试了this one和thispython包,但由于某种原因(找不到DBT路径等)而失败。 是否有人成功使用过MWAA(气流2)和DBT,而不必构建坞站映像并将其放置在某个位置? 谢谢! 推荐答案 我通过执行以下步骤设法解决了此问题: 将dbt-core==0.19.1 ..
发布时间:2022-03-01 20:56:26 其他开发

如何使用部署在GCPCloud Composer上的气流稳定睡觉接口[2.0.0版]

环境:我们使用GCP Cloud Composer来运行气流DAG。 目的:使用阿帕奇气流稳定接口,通过睡觉对外触发DAG。 我们尝试通过在Cloud Composer:airflow.api.auth.backend.default中设置气流覆盖配置,并发出IAP请求,尝试使用气流实验API对外触发DAG。它工作得很好。我们按照https://cloud.google.com/compos ..

月底的气流DAG计划

我要运行气流计划(v1.9.0)。 我的DAG需要在每个月底运行,但我不知道如何写入设置。 my_dag = DAG(dag_id=DAG_ID, catchup=False, default_args=default_args, schedule_interval='30 0 31 * *', ..
发布时间:2022-03-01 20:49:25 其他开发

气流:并发深度优先,而不是广度优先?

在Airflow中,默认配置似乎是将任务并行排队,时间跨过几天--从一天到下一天。 然而,如果我将这个过程加快,比如说两年,那么气流DAG将首先在所有的日子里通过初步过程,而不是像这样,从开始到结束同时花费4天的时间。 如何切换气流以根据深度优先范式而不是广度优先范式执行任务? 推荐答案 我也遇到过类似的情况。我使用以下技巧实现了深度优先行为。 将DAG的所有任务分配给s ..
发布时间:2022-03-01 20:39:57 其他开发

气流log_id格式错误

我使用的是在Kubernetes中运行的Airflow v2.2.3和apache-airflow-providers-elasticsearch==2.1.0。 我们的日志会自动发送到Elasticsearch v7.6.2。 我在气流中设置了以下配置: AIRFLOW__LOGGING__REMOTE_LOGGING=True AIRFLOW__ELASTICSEARCH__ ..
发布时间:2022-03-01 20:37:08 其他开发

Jupyter笔记本和造纸厂辅助气流中的ETL

所以我的问题是,我在气流中构建ETL管道,但真正要先在Jupyter笔记本中开发和测试提取、转换和加载功能。因此,我总是在Airflow Python操作符代码和Jupyter笔记本之间来回复制粘贴,效率相当低!我的直觉告诉我,所有这些都可以自动完成。 基本上,我希望用Jupyter编写我的提取、转换和加载函数,并让它们保留在那里,同时仍然在气流中运行管道,并显示提取、转换和加载任务,以及重 ..
发布时间:2022-03-01 20:34:54 Python

气流如何设置dag_run.conf的默认值

我正在尝试设置一个气流DAG,该DAG提供来自dag_run.conf的默认值。当从WebUI使用";run w/Config";选项运行DAG时,此功能非常有效。但按计划运行时,dag_run.conf字典不存在,任务会失败,如 jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1' ..
发布时间:2022-03-01 20:32:30 其他开发

使用Airflow BigqueryOperator向BigQuery表添加标签

我必须向BigQuery表添加标签。我知道可以通过BigQuery UI完成此操作,但如何通过气流操作符完成此操作。 使用案例:用于计费和搜索。由于多个团队在同一项目和数据集下工作,因此我们需要将各自团队创建的所有表组合在一起。由于每个团队对表有不同的标签,因此标签对我们是必需的。 bq_query = BigQueryOperator(bql=sql, ..

使用 Apache Airflow 将文件从一个 Google Cloud Storage Bucket 复制到另一个

问题:我想将文件从 Google Cloud Storage Bucket 中的文件夹(例如 Bucket1 中的 Folder1)复制到另一个 Bucket(例如 Bucket2).我找不到任何用于 Google Cloud Storage 复制文件的 Airflow Operator. 解决方案 我刚刚在2小时前上传的contrib中发现了一个新的算子:https://github.c ..

获取 ModuleNotFoundError:Airflow Bash Operator 上没有名为“pandas"的模块错误

我的气流 bash 操作员突然收到错误 ModuleNotFoundError: No module named 'pandas' 过去 2 个月运行良好,但从昨天开始停止运行.我没有更改服务器上的任何内容或没有更新我的服务器. 解决方案 BashOperator 只是在你的机器上运行 bash 命令.所以检查你的机器上是否安装了 pandas.如果没有,请执行 pip instal ..
发布时间:2022-01-24 23:37:52 其他开发

如何在 Airflow 中创建条件任务

我想在 Airflow 中创建一个条件任务,如下面的架构中所述.预期的情况如下: 任务 1 执行 如果任务 1 成功,则执行任务 2a Else 如果任务 1 失败,则执行任务 2b 最终执行任务 3 以上所有任务都是 SSHExecuteOperator.我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它.你能描述一下解 ..
发布时间:2022-01-20 23:15:06 Python

如何在不使用 GUI 的情况下创建、更新和删除气流变量?

我一直在学习气流并为 ETL 管道编写 DAG.它涉及使用 AWS 环境(S3、Redshift).它处理在将数据存储在 redshift 之后将数据从一个存储桶复制到另一个存储桶.我将存储桶名称和前缀作为变量存储在气流中,您必须打开 GUI 并手动添加它们. 在以下选项中,哪个是业内最安全和最广泛使用的做法 我们可以使用 airflow.cfg 来存储我们的变量(bucket 名称) ..
发布时间:2022-01-16 08:07:49 其他开发

无法在 Apache Airflow 中设置 DB2/DashDB JDBC 连接

我正在尝试使用 Airflow UI 创建 DB2/DashDB 连接.我添加了 db2jcc4.jar 驱动程序并提供了路径以及类名 com.ibm.db2.jcc.DB2Driver.class 我尝试运行一个简单的查询(在 ad hoc UI 中)并且总是得到相同的错误 java.lang.RuntimeException:找不到类 com.ibm.db2.jcc.DB2Driv ..
发布时间:2022-01-14 22:37:03 其他开发

作业不通过使用 RabbitMQ 运行 celery 的 Airflow 执行

下面是我使用的配置 [核心]# 气流的主文件夹,默认为 ~/airflowairflow_home =/root/气流# 气流管道所在的文件夹,很可能是# 代码库中的子文件夹dags_folder =/root/airflow/dags#气流应该存储其日志文件的文件夹.这个位置base_log_folder =/root/airflow/logs# 可以为日志备份提供一个 S3 位置# 对于 S ..
发布时间:2022-01-11 17:26:15 其他开发