airflow-scheduler相关内容

如何对气流变量和连接进行版本控制?

从开发的角度来看,在UI中定义变量和连接是有效的,但并不可靠,因为无法跟踪已添加和删除的内容。 Airflow came up with a way to store variables as environment variables。但由此产生了几个自然的问题: 是否需要在每个DAG之前定义此选项?如果我有多个DAG共享相同的环境值,该怎么办?每次都给它下定义似乎有点多余。 如果 ..
发布时间:2022-08-04 13:09:09 其他开发

Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务

最近,我将Airflow从1.9升级到1.10.3(最新版本)。 但是,我确实注意到了与SubDag并发性相关的性能问题。只能代答SubDag内的1个任务,这不是应有的方式,我们的SubDag的并发设置为8。 请参阅以下内容: get_monthly_summary-214和get_monthly_summary-215是两个子DAG,可以通过父DAG并发在并行控制器中运行 但当放大到S ..
发布时间:2022-06-28 15:42:24 其他开发

从气流到Azure文件共享的事务太多

我有一个关于在AKS中运行时气流的问题。 我们已在AKS中部署了气流,并已将Azure文件共享安装到气流吊舱。我们已将此文件共享用于DAG文件夹。然而,从气流到文件共享有大量的事务(每5分钟至少20K),这给我们带来了来自Azure的大量成本。仅供参考-对Azure文件共享的计费基于事务数量,而不是我们使用的大小。 您能告诉我在文件共享中是否有DAG文件夹的问题吗?如果是这样的话,有没 ..
发布时间:2022-03-01 21:05:22 其他开发

月底的气流DAG计划

我要运行气流计划(v1.9.0)。 我的DAG需要在每个月底运行,但我不知道如何写入设置。 my_dag = DAG(dag_id=DAG_ID, catchup=False, default_args=default_args, schedule_interval='30 0 31 * *', ..
发布时间:2022-03-01 20:49:25 其他开发

气流:并发深度优先,而不是广度优先?

在Airflow中,默认配置似乎是将任务并行排队,时间跨过几天--从一天到下一天。 然而,如果我将这个过程加快,比如说两年,那么气流DAG将首先在所有的日子里通过初步过程,而不是像这样,从开始到结束同时花费4天的时间。 如何切换气流以根据深度优先范式而不是广度优先范式执行任务? 推荐答案 我也遇到过类似的情况。我使用以下技巧实现了深度优先行为。 将DAG的所有任务分配给s ..
发布时间:2022-03-01 20:39:57 其他开发

气流log_id格式错误

我使用的是在Kubernetes中运行的Airflow v2.2.3和apache-airflow-providers-elasticsearch==2.1.0。 我们的日志会自动发送到Elasticsearch v7.6.2。 我在气流中设置了以下配置: AIRFLOW__LOGGING__REMOTE_LOGGING=True AIRFLOW__ELASTICSEARCH__ ..
发布时间:2022-03-01 20:37:08 其他开发

动态任务组与动态 DAG

假设我有一个带有任务的流程: T1 >>T2>>T3 进程需要为一组 id [1,2,3] 运行: process_run_with_id1process_run_with_id2process_run_with_id3 我可以创建具有多个任务组的单个 DAG,其中每个任务组代表要为该 ID 运行的一组任务: DAG = >TG_for_1、TG_for_2、TG_for_3 ..
发布时间:2021-10-26 18:04:22 其他开发

气流:如何设置日志目录?

我将 dag 文件上传到网页,当我单击 'Graph View' -> ${my_dag} -> 'View Log',它显示: *** 日志文件不是本地的.*** 在这里获取:http://:8793/log/demo_dag/hello_task/2018-11-14T15:06:00*** 无法从工作人员获取日志文件.*** 正在读取远程日志...*** 不支持的远程日志位置. 我检查了 ..
发布时间:2021-10-26 18:04:05 其他开发

如何限制对airflow.models的访问?

我有一个气流实例,其中包含许多具有 DAG 的租户.他们想在他们的 dagruns 上提取元数据,比如 DagRun.end_date.但是我想限制每个租户,以便他们只能访问与自己的 dagruns 相关的数据,而无法访问其他人的 dagruns 的数据.这怎么办? 这就是我想象的 DAG 的样子 #自定义宏函数def get_last_dag_run(dag):last_dag_run ..
发布时间:2021-10-26 18:03:56 其他开发

我想将 cloud composer 创建的气流网络服务器的时区从 utc 更改为 jst(亚洲/东京)

我想将 cloud composer 创建的气流网络服务器的时区从 utc 更改为 jst(Asia/Tokyo). 然而,即使“webserver-default_ui_timezone ='JST'"由airflow config overwrite设置,webserver的时间不能改变. 即使我将气流中使用的 VM(GKE 节点)的时区从 utc 更改为 jst(Asia/Tok ..

execution_date 是 DAG 运行的日期还是任务运行的日期?

execution_date 的值是 DAG 运行的时间日期/时间--它的所有任务的值是否相同--或者是 execution_date (可能)不同 DAG 中的每个任务? 解决方案 execution_date 是运行间隔的开始.所有任务都具有与其运行相同的 execution_date 值.这就是它们与代码中的运行相关联的方式. 这样想:如果您每季度运行一个流程并根据该季度的数据 ..
发布时间:2021-10-26 18:03:48 其他开发

气流集群策略未被调用

我正在尝试设置和了解自定义策略.但是,不确定我做错了什么,以下是行不通的. 气流版本:1.10.10 预期结果:如果我尝试使用 default_owner 运行 DAG,它应该抛出异常 实际结果:没有这样的例外 /root/airflow/config/airflow_local_settings.py class PolicyError(Exception):经过def ..
发布时间:2021-10-26 18:03:31 其他开发

Airflow 1.9.0 ExternalTask​​Sensor retry_delay=30 产生 TypeError: can't pickle _thread.RLock objects

正如标题所说;在 Airflow 1.9.0 中,如果您将 retry_delay=30(或任何其他数字)参数与 ExternalTask​​Sensor 一起使用,DAG 将运行得很好,直到您想清除气流 GUI 中的任务实例 -> 它将返回以下错误:“TypeError: can't pickle _thread.RLock objects"(以及一个不错的糟糕消息)但是如果你使用 retry_ ..
发布时间:2021-10-26 18:03:00 Python

Airflow dags 生命周期事件

我正在尝试通过 java 后端管理气流 dag(创建、执行等).目前,在创建 dag 并将其放置在气流的 dags 文件夹中后,我的后端一直在尝试运行 dag.但是直到它被气流调度程序接收到它才能运行它,如果 dag 的数量更多,这可能需要相当长的时间.我想知道是否有气流发出的任何事件,我可以点击这些事件来检查调度程序处理的新 dag,然后触发,从我的后端执行命令.或者有没有一种方法或配置让气流在 ..
发布时间:2021-10-26 18:02:43 其他开发

气流 - 如何获得所有未来的运行日期

我正在安排气流作业.但是,为了验证我是否安排了正确的作业,我需要查看将来何时运行. Airflow 具有以下命令,可让我进行下一次运行.但是,这对于某些用例来说还不够.例如,我安排了每隔一个星期五运行一次作业.我如何验证这一点. airflow next_execution 有没有办法,我可以获得此 dag 运行的所有未来日期.或至少几个? 解决方案 虽然大多数进程使用 cro ..
发布时间:2021-10-26 18:02:16 其他开发