airflow相关内容

舵图发布官方气流

我想知道如何使用官方的气流舵图并覆盖values.yaml文件来编写Helm Release YAML文件。 我正在尝试使用此配置文件在Kubernetes群集上部署带有通量的气流。 我已尝试: apiVersion: helm.fluxcd.io/v1 kind: HelmRelease metadata: name: airflow namespace: dev spec: ..
发布时间:2022-08-10 22:36:57 其他开发

如何对气流变量和连接进行版本控制?

从开发的角度来看,在UI中定义变量和连接是有效的,但并不可靠,因为无法跟踪已添加和删除的内容。 Airflow came up with a way to store variables as environment variables。但由此产生了几个自然的问题: 是否需要在每个DAG之前定义此选项?如果我有多个DAG共享相同的环境值,该怎么办?每次都给它下定义似乎有点多余。 如果 ..
发布时间:2022-08-04 13:09:09 其他开发

Python运算符中的空气流宏

我正尝试在我的Python操作符中使用Airflow宏,但我一直收到“Airflow:Error:UnRecognition Arguments:” 所以我正在导入一个有3个位置参数的函数:(sys.argv,start_date,end_date),我希望将start_date和end_date作为Airflow中的执行日期。 函数参数如下所示 def main(argv,st ..
发布时间:2022-08-04 12:55:50 其他开发

在Google Cloud Composer中使用Airflow模板文件和TEMPLATE_SEARTATH

我正在Google Cloud Composer上的气流DAG中扩展使用BigQueryOperator。 对于较长的查询,最好将每个查询放在它自己的.sql文件中,而不是把它弄乱了DAG。Airflow似乎支持所有SQL查询操作符,包括BigQueryOperator,如the documentation中所示。 我的问题:在.sql模板文件中编写了一条我的SQL语句后,如何将其添加到G ..
发布时间:2022-08-04 12:40:04 其他开发

如何将气流调度器部署到AWS EC2?

我正在尝试使用Airflow在AWS上建立一条简单的数据管道。 我已经创建了一个DAG,它每天将数据抓取到S3,然后使用在EMR上运行的Spark作业进行处理。 我当前在本地笔记本电脑上运行气流计划程序,但我当然知道这不是一个好的长期解决方案。 所以我想了解一些关于将调度程序部署到EC2的提示(实例大小、部署进程或任何其他有用的信息) 推荐答案 在本地运行通常不是可行的后期 ..

回调故障时的气流

我的气流DAG有两个任务: Read_CSV 进程文件 他们自己工作得很好。我故意在 pandas 数据帧中创建了一个打字错误,以了解on_failure_callback是如何工作的,并查看它是否被触发。从日志中看似乎并非如此: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-p ..
发布时间:2022-06-28 16:00:07 其他开发

Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务

最近,我将Airflow从1.9升级到1.10.3(最新版本)。 但是,我确实注意到了与SubDag并发性相关的性能问题。只能代答SubDag内的1个任务,这不是应有的方式,我们的SubDag的并发设置为8。 请参阅以下内容: get_monthly_summary-214和get_monthly_summary-215是两个子DAG,可以通过父DAG并发在并行控制器中运行 但当放大到S ..
发布时间:2022-06-28 15:42:24 其他开发

使用TriggerDagRunOperator多次运行另一个DAG

我有一个DAG(DAG1),我在其中复制一堆文件。然后,我想为复制的每个文件启动另一个DAG(DAG2)。由于每次运行DAG1时复制的文件数量会有所不同,因此我希望循环遍历这些文件,并使用适当的参数调用DAG2。 例如: with DAG( 'DAG1', description="copy files over", schedule_interval= ..
发布时间:2022-06-28 15:38:07 其他开发

气流信号出乎意料地向子过程发出信号

我正在使用PythonOperator调用一个函数,该函数将数据工程流程并行化为气流任务。这只需用Airflow调用的可调用包装函数包装一个简单函数即可。 def wrapper(ds, **kwargs): process_data() 使用派生子进程的多处理模块实现并行化。当我从jupyter笔记本单独运行process_data时,它运行到最后都没有问题。然而,当我使用气流 ..
发布时间:2022-06-28 15:27:37 Python

如何在ApacheAirflow中运行BigQuery查询,然后将输出的CSV发送到Google Cloud Storage?

我需要在python中运行一个BigQuery脚本,它需要在Google云存储中作为CSV输出。目前,我的脚本触发大查询代码,直接保存到我的PC上。 但是,我需要让它在气流中运行,这样我就不能有任何本地依赖项。 我当前的脚本将输出保存到本地计算机,然后我必须将其移到GCS中。我在网上查过了,但我想不通。(PS我还是个新手,所以如果以前有人问过这个问题,我很抱歉!) import pa ..
发布时间:2022-06-28 15:10:16 Python

在AirFlow 2.0中运行多个雅典娜查询

我正在尝试创建一个DAG,其中一个任务使用boto3执行athena查询。它对一个查询有效,但是当我尝试运行多个雅典娜查询时遇到问题。 此问题可以按如下方式解决:- 翻阅thisblog可以看到,athena使用start_query_execution触发查询,get_query_execution获取status、queryExecutionId等查询数据(athena的文档) ..
发布时间:2022-03-12 19:18:11 Python

从气流到Azure文件共享的事务太多

我有一个关于在AKS中运行时气流的问题。 我们已在AKS中部署了气流,并已将Azure文件共享安装到气流吊舱。我们已将此文件共享用于DAG文件夹。然而,从气流到文件共享有大量的事务(每5分钟至少20K),这给我们带来了来自Azure的大量成本。仅供参考-对Azure文件共享的计费基于事务数量,而不是我们使用的大小。 您能告诉我在文件共享中是否有DAG文件夹的问题吗?如果是这样的话,有没 ..
发布时间:2022-03-01 21:05:22 其他开发

如何获取DAG状态,如正在运行、成功或失败

我想知道DAG的状态是正在运行、失败还是成功。我正在通过CL参数气流触发器触发DAG,在作业执行之后,我想知道运行的状态。我找不到任何办法。 我尝试了气流dag_state,但没有给出任何提示。如果一天中有多次运行以通过命令行参数或python代码获取最近一次运行的状态,我应该怎么办。 CLI 您可以将list_dag_runs命令与推荐答案配合使用,以列出给定DAG ID的DA ..
发布时间:2022-03-01 21:02:46 其他开发