airflow相关内容
最初使用dag callback(on_failure_callback和on_success_callback)时,我以为success或fail会在dag结束时触发success或fail状态(正如DAG中定义的那样)。 但是之后似乎每隔task instance而不是dag run实例化一次,所以如果一个DAG有N个任务,那么它会触发这些回调N次。 我正在尝试捕获task-id,因此将
..
希望你一切顺利。 我想检查是否有人在AWS MWAA Airflow中启动并运行DBT。 我尝试了this one和thispython包,但由于某种原因(找不到DBT路径等)而失败。 是否有人成功使用过MWAA(气流2)和DBT,而不必构建坞站映像并将其放置在某个位置? 谢谢! 推荐答案 我通过执行以下步骤设法解决了此问题: 将dbt-core==0.19.1
..
环境:我们使用GCP Cloud Composer来运行气流DAG。 目的:使用阿帕奇气流稳定接口,通过睡觉对外触发DAG。 我们尝试通过在Cloud Composer:airflow.api.auth.backend.default中设置气流覆盖配置,并发出IAP请求,尝试使用气流实验API对外触发DAG。它工作得很好。我们按照https://cloud.google.com/compos
..
我要运行气流计划(v1.9.0)。 我的DAG需要在每个月底运行,但我不知道如何写入设置。 my_dag = DAG(dag_id=DAG_ID, catchup=False, default_args=default_args, schedule_interval='30 0 31 * *',
..
我有一些关于BaseSensorOperator参数工作方式的念力:timeout&;poke_interval。 请考虑传感器的以下用法: BaseSensorOperator( soft_fail=True, poke_interval = 4*60*60, # Poke every 4 hours timeout = 12*60*60, # Timeout aft
..
我有以下具有3项任务的DAG: start --> special_task --> end 中间的任务可以成功也可以失败,但是end必须始终执行(假设这是一个干净关闭资源的任务)。为此,我使用了trigger ruleALL_DONE: end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE 使用它,如果special_tas
..
在Airflow中,默认配置似乎是将任务并行排队,时间跨过几天--从一天到下一天。 然而,如果我将这个过程加快,比如说两年,那么气流DAG将首先在所有的日子里通过初步过程,而不是像这样,从开始到结束同时花费4天的时间。 如何切换气流以根据深度优先范式而不是广度优先范式执行任务? 推荐答案 我也遇到过类似的情况。我使用以下技巧实现了深度优先行为。 将DAG的所有任务分配给s
..
我使用的是在Kubernetes中运行的Airflow v2.2.3和apache-airflow-providers-elasticsearch==2.1.0。 我们的日志会自动发送到Elasticsearch v7.6.2。 我在气流中设置了以下配置: AIRFLOW__LOGGING__REMOTE_LOGGING=True AIRFLOW__ELASTICSEARCH__
..
所以我的问题是,我在气流中构建ETL管道,但真正要先在Jupyter笔记本中开发和测试提取、转换和加载功能。因此,我总是在Airflow Python操作符代码和Jupyter笔记本之间来回复制粘贴,效率相当低!我的直觉告诉我,所有这些都可以自动完成。 基本上,我希望用Jupyter编写我的提取、转换和加载函数,并让它们保留在那里,同时仍然在气流中运行管道,并显示提取、转换和加载任务,以及重
..
我正在尝试设置一个气流DAG,该DAG提供来自dag_run.conf的默认值。当从WebUI使用";run w/Config";选项运行DAG时,此功能非常有效。但按计划运行时,dag_run.conf字典不存在,任务会失败,如 jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
..
我们将Airflow v2.1.4与Kubernetes执行器一起使用。 我们的K8S群集Pod stdout通过filebatch->;logstash->;Elasticsearch(Elk)自动发货。 在logstash中,我们正在创建log_id字段: mutate { copy => { "[log][offset]" => "offset" }
..
我必须向BigQuery表添加标签。我知道可以通过BigQuery UI完成此操作,但如何通过气流操作符完成此操作。 使用案例:用于计费和搜索。由于多个团队在同一项目和数据集下工作,因此我们需要将各自团队创建的所有表组合在一起。由于每个团队对表有不同的标签,因此标签对我们是必需的。 bq_query = BigQueryOperator(bql=sql,
..
问题:我想将文件从 Google Cloud Storage Bucket 中的文件夹(例如 Bucket1 中的 Folder1)复制到另一个 Bucket(例如 Bucket2).我找不到任何用于 Google Cloud Storage 复制文件的 Airflow Operator. 解决方案 我刚刚在2小时前上传的contrib中发现了一个新的算子:https://github.c
..
我的气流 bash 操作员突然收到错误 ModuleNotFoundError: No module named 'pandas' 过去 2 个月运行良好,但从昨天开始停止运行.我没有更改服务器上的任何内容或没有更新我的服务器. 解决方案 BashOperator 只是在你的机器上运行 bash 命令.所以检查你的机器上是否安装了 pandas.如果没有,请执行 pip instal
..
我得到了 -bash:气流:找不到命令 安装 Apache Airflow 后.我使用的是 Google Cloud Compute Engine,操作系统是 Debian 9 (Stretch). 我已按照以下步骤操作: export AIRFLOW_HOME=~/airflow点安装 apache-气流 解决方案 我已经卸载了 Apache Airflow 并使用 su
..
我想在 Airflow 中创建一个条件任务,如下面的架构中所述.预期的情况如下: 任务 1 执行 如果任务 1 成功,则执行任务 2a Else 如果任务 1 失败,则执行任务 2b 最终执行任务 3 以上所有任务都是 SSHExecuteOperator.我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它.你能描述一下解
..
我试图通过传入不起作用的 Bash 行 (thisshouldnotrun) 故意使 Airflow 任务失败并出错.气流正在输出以下内容: [2017-06-15 17:44:17,869] {bash_operator.py:94} INFO -/tmp/airflowtmpLFTMX7/run_bashm2MEsS:第 7 行:thisshouldnotrun:找不到命令[2017-06-
..
我一直在学习气流并为 ETL 管道编写 DAG.它涉及使用 AWS 环境(S3、Redshift).它处理在将数据存储在 redshift 之后将数据从一个存储桶复制到另一个存储桶.我将存储桶名称和前缀作为变量存储在气流中,您必须打开 GUI 并手动添加它们. 在以下选项中,哪个是业内最安全和最广泛使用的做法 我们可以使用 airflow.cfg 来存储我们的变量(bucket 名称)
..
我正在尝试使用 Airflow UI 创建 DB2/DashDB 连接.我添加了 db2jcc4.jar 驱动程序并提供了路径以及类名 com.ibm.db2.jcc.DB2Driver.class 我尝试运行一个简单的查询(在 ad hoc UI 中)并且总是得到相同的错误 java.lang.RuntimeException:找不到类 com.ibm.db2.jcc.DB2Driv
..
下面是我使用的配置 [核心]# 气流的主文件夹,默认为 ~/airflowairflow_home =/root/气流# 气流管道所在的文件夹,很可能是# 代码库中的子文件夹dags_folder =/root/airflow/dags#气流应该存储其日志文件的文件夹.这个位置base_log_folder =/root/airflow/logs# 可以为日志备份提供一个 S3 位置# 对于 S
..