airflow相关内容
我想知道如何使用官方的气流舵图并覆盖values.yaml文件来编写Helm Release YAML文件。 我正在尝试使用此配置文件在Kubernetes群集上部署带有通量的气流。 我已尝试: apiVersion: helm.fluxcd.io/v1 kind: HelmRelease metadata: name: airflow namespace: dev spec:
..
从开发的角度来看,在UI中定义变量和连接是有效的,但并不可靠,因为无法跟踪已添加和删除的内容。 Airflow came up with a way to store variables as environment variables。但由此产生了几个自然的问题: 是否需要在每个DAG之前定义此选项?如果我有多个DAG共享相同的环境值,该怎么办?每次都给它下定义似乎有点多余。 如果
..
错误: [2019-04-11 10:08:44,005] {{ssh_operator.py:80}} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook. [2019-04-11 10:08:44,015] {{logging_mixin.py:95}} INFO - [2019
..
我正尝试在我的Python操作符中使用Airflow宏,但我一直收到“Airflow:Error:UnRecognition Arguments:” 所以我正在导入一个有3个位置参数的函数:(sys.argv,start_date,end_date),我希望将start_date和end_date作为Airflow中的执行日期。 函数参数如下所示 def main(argv,st
..
在GCP中,从UI或gCloud命令安装和运行JupyterHub component相当简单。我试图通过气流和DataprocClusterCreateOperator编写这个过程的脚本,这里是DAG的摘录 from airflow.contrib.operators import dataproc_operator create_cluster=dataproc_operator.
..
我正在Google Cloud Composer上的气流DAG中扩展使用BigQueryOperator。 对于较长的查询,最好将每个查询放在它自己的.sql文件中,而不是把它弄乱了DAG。Airflow似乎支持所有SQL查询操作符,包括BigQueryOperator,如the documentation中所示。 我的问题:在.sql模板文件中编写了一条我的SQL语句后,如何将其添加到G
..
我正在Cent OS 7上安装Airflow。我已经配置了Airflow db init,并检查了nginx服务器的状态以及它的工作正常。但是,当我运行airflow Web服务器命令时,我收到了下面提到的错误*[2021-03-22 14:59:30 +0000] [9019] [INFO] Booting worker with pid: 9019 [2021-03-22 14:59:32,5
..
我正在尝试使用Airflow在AWS上建立一条简单的数据管道。 我已经创建了一个DAG,它每天将数据抓取到S3,然后使用在EMR上运行的Spark作业进行处理。 我当前在本地笔记本电脑上运行气流计划程序,但我当然知道这不是一个好的长期解决方案。 所以我想了解一些关于将调度程序部署到EC2的提示(实例大小、部署进程或任何其他有用的信息) 推荐答案 在本地运行通常不是可行的后期
..
我的气流DAG有两个任务: Read_CSV 进程文件 他们自己工作得很好。我故意在 pandas 数据帧中创建了一个打字错误,以了解on_failure_callback是如何工作的,并查看它是否被触发。从日志中看似乎并非如此: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-p
..
我在没有Fernet_key的情况下启动了气流。一旦我意识到这一点,我做了以下事情: https://airflow.apache.org/configuration.html#connections pip install apache-airflow[crypto] from cryptography.fernet import Fernet fernet_key= Fernet.ge
..
最近,我将Airflow从1.9升级到1.10.3(最新版本)。 但是,我确实注意到了与SubDag并发性相关的性能问题。只能代答SubDag内的1个任务,这不是应有的方式,我们的SubDag的并发设置为8。 请参阅以下内容: get_monthly_summary-214和get_monthly_summary-215是两个子DAG,可以通过父DAG并发在并行控制器中运行 但当放大到S
..
我有一个DAG(DAG1),我在其中复制一堆文件。然后,我想为复制的每个文件启动另一个DAG(DAG2)。由于每次运行DAG1时复制的文件数量会有所不同,因此我希望循环遍历这些文件,并使用适当的参数调用DAG2。 例如: with DAG( 'DAG1', description="copy files over", schedule_interval=
..
我正在使用PythonOperator调用一个函数,该函数将数据工程流程并行化为气流任务。这只需用Airflow调用的可调用包装函数包装一个简单函数即可。 def wrapper(ds, **kwargs): process_data() 使用派生子进程的多处理模块实现并行化。当我从jupyter笔记本单独运行process_data时,它运行到最后都没有问题。然而,当我使用气流
..
我从气流变量中读取一个整数变量,然后在每次DAG运行时将该值加1,并再次将其设置为该变量。 但在下面的代码之后,每次刷新页面时,UI处的变量都会更改。 了解导致此类行为的原因 counter = Variable.get('counter') s = BashOperator( task_id='echo_start_variable', bash_command='e
..
我需要在python中运行一个BigQuery脚本,它需要在Google云存储中作为CSV输出。目前,我的脚本触发大查询代码,直接保存到我的PC上。 但是,我需要让它在气流中运行,这样我就不能有任何本地依赖项。 我当前的脚本将输出保存到本地计算机,然后我必须将其移到GCS中。我在网上查过了,但我想不通。(PS我还是个新手,所以如果以前有人问过这个问题,我很抱歉!) import pa
..
我正在尝试创建一个DAG,其中一个任务使用boto3执行athena查询。它对一个查询有效,但是当我尝试运行多个雅典娜查询时遇到问题。 此问题可以按如下方式解决:- 翻阅thisblog可以看到,athena使用start_query_execution触发查询,get_query_execution获取status、queryExecutionId等查询数据(athena的文档)
..
我想在将CSV文件加载到RedShift表之前截断RedShift表。 错误: airflow.exceptions.AirflowException:传递给S3ToRedshitOperator(task_id:dag_run_s3_to_redshift)的参数无效。无效参数为: **kwargs:{‘method’:‘place’} 以下代码: task_fail_s3_t
..
试用以下示例: https://cloud.google.com/blog/big-data/2017/07/how-to-aggregate-data-for-bigquery-using-apache-airflow 运行以下命令之一时: airflow test bigquery_github_trends_v1 bq_check_githubarchive_day 201
..
我有一个关于在AKS中运行时气流的问题。 我们已在AKS中部署了气流,并已将Azure文件共享安装到气流吊舱。我们已将此文件共享用于DAG文件夹。然而,从气流到文件共享有大量的事务(每5分钟至少20K),这给我们带来了来自Azure的大量成本。仅供参考-对Azure文件共享的计费基于事务数量,而不是我们使用的大小。 您能告诉我在文件共享中是否有DAG文件夹的问题吗?如果是这样的话,有没
..
我想知道DAG的状态是正在运行、失败还是成功。我正在通过CL参数气流触发器触发DAG,在作业执行之后,我想知道运行的状态。我找不到任何办法。 我尝试了气流dag_state,但没有给出任何提示。如果一天中有多次运行以通过命令行参数或python代码获取最近一次运行的状态,我应该怎么办。 CLI 您可以将list_dag_runs命令与推荐答案配合使用,以列出给定DAG ID的DA
..