在已经计划并运行Apache Airflow DAG的同时进行测试? [英] Test an Apache Airflow DAG while it is already scheduled and running?

查看:266
本文介绍了在已经计划并运行Apache Airflow DAG的同时进行测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我运行了以下测试命令:

I ran the following test command:

airflow test events {task_name_redacted} 2018-12-12

...并获得以下输出:

...and got the following output:

Dependencies not met for <TaskInstance: events.{redacted} 2018-12-12T00:00:00+00:00 [None]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (16) for this task's DAG 'events' has been reached.
[2019-01-17 19:47:48,978] {models.py:1556} WARNING - 
--------------------------------------------------------------------------------
FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 6. State set to NONE.
--------------------------------------------------------------------------------

[2019-01-17 19:47:48,978] {models.py:1559} INFO - Queuing into pool None

我的气流已配置最大并发性为16。这是否意味着我不能在DAG当前正在运行并且已经使用了所有任务插槽的情况下测试任务?

My Airflow is configured with a maximum concurrency of 16. Does this mean that I cannot test a task when the DAG is currently running, and has used all of it's task slots?

还可以吗?从文档中还不清楚,但是气流测试是否实际执行了任务,就好像它是 SparkSubmitOperator ,它实际上会提交作业吗?

Also, it was a little unclear from the docs, but does the airflow test actually execute the task, as in if it was a SparkSubmitOperator, it would actually submit the job?

推荐答案

虽然我还没有达到并发至关重要的部署阶段,但是文档确实提供了相当不错的

While I am yet to reach that phase of deployment where concurrency will matter, the docs do give a fairly good indication of problem at hand


  1. 由于在任何时间点只有一个 Scheduler 正在运行(并且无论如何您都不应运行多个),实际上无论是否 DAG -运行实时-运行测试- 运行,此限制将共同应用于他们。因此,这肯定是一个障碍。

  1. Since at any point of time just one scheduler is running (and you shouldn't be running multiple anyways), indeed it appears that irrespective of whether the DAG-runs are live-runs or test-runs, this limit will apply on them collectively. So that is certainly a hurdle.


#调度程序允许并发运行的任务实例数

# The number of task instances allowed to run concurrently by the scheduler

dag_concurrency = 16




< hr>



  1. 但是请注意,只是增加这个数字(假设您有足够多的 boxs 对于庞大的 worker s /多个 worker s),还必须调整其他几种配置才能实现我感觉到你想要并行性。

  1. But beware that merely increasing this number (assuming you have big-enough boxes for hefty workers / multiple workers), several other configurations will have to be tweaked as well to achieve the kind of parallelism I sense you want.

它们都列在 [core] 部分


#并行度作为执行程序的设置。
定义了在这种气流装置上应同时运行
的任务实例的最大数量

# The amount of parallelism as a setting to the executor. This defines the max number of task instances that should run simultaneously on this airflow installation

parallelism = 32

#不使用池时,任务在默认池中运行,默认池的
大小由此配置元素决定

# When not using pools, tasks are run in the "default pool", whose size is guided by this config element

non_pooled_task_slot_count = 128

#每个DAG运行的活动DAG的最大数量

# The maximum number of active DAG runs per DAG

max_active_runs_per_dag = 16




< hr>



  1. 但是我们仍然不在那儿,因为一旦您同时生成了这么多任务,后端元数据 -db将开始阻塞。虽然这可能是一个小问题(除非您有一些真正的 DAG s /非常大的 Variable interactions ),仍然值得一提的潜在障碍

  1. But we are still not there, because once you spawn so many tasks simultaneously, the backend metadata-db will start choking. While this is likely a minor problem (and might not be affecting unless you have some real huge DAGs / very large no of Variable interactions in your tasks), its still worth noting as a potential roadblock


#SqlAlchemy池size是池中数据库
连接的最大数目。 0表示没有限制。

# The SqlAlchemy pool size is the maximum number of database connections in the pool. 0 indicates no limit.

sql_alchemy_pool_size = 5

#SqlAlchemy池回收是连接
在失效之前可以在池中空闲的秒数。此配置不适用于$ sql $
。如果超过了数据库连接的数量,则降低
的配置值将使系统恢复更快。

# The SqlAlchemy pool recycle is the number of seconds a connection can be idle in the pool before it is invalidated. This config does not apply to sqlite. If the number of DB connections is ever exceeded, a lower config value will allow the system to recover faster.

sql_alchemy_pool_recycle = 1800 >


断开连接后,多少秒可以重试建立数据库连接。将其设置为0将禁用重试。

# How many seconds to retry re-establishing a DB connection after disconnects. Setting this to 0 disables retries.

sql_alchemy_reconnect_timeout = 300









  1. 不用说,所有这些都是除非您选择正确的 executor ,否则这几乎是徒劳的; SequentialExecutor ,特别是仅用于测试

  1. Needless to say, all this is pretty much futile unless you pick the right executor; SequentialExecutor, in particular is only intended for testing


#应该使用。选项包括SequentialExecutor,LocalExecutor,CeleryExecutor,DaskExecutor,
KubernetesExecutor

# The executor class that airflow should use. Choices include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor

executor = SequentialExecutor









  1. 但是 BaseOperator depends_on_past wait_for_downstream 一样,也会破坏聚会 li>
  1. But then params to BaseOperator like depends_on_past, wait_for_downstream are there to spoil the party as well








  1. 最后,我把这个留给你与气流 + 火花组合相关的链接:如何从Airflow向EMR群集提交Spark作业?

  1. Finally I leave you with this link related to Airflow + Spark combination: How to submit Spark jobs to EMR cluster from Airflow?

(如果答案比您对您的困惑更大,请原谅我,但是..)

这篇关于在已经计划并运行Apache Airflow DAG的同时进行测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆