在气流中子标签创建时访问父dag上下文? [英] Access parent dag context at subtag creation time in airflow?

查看:140
本文介绍了在气流中子标签创建时访问父dag上下文?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在subdag创建时访问父dag的一些xcom数据,我正在搜索以在Internet上实现此功能,但未找到任何内容。

  def test(task_id):
logging.info(f'执行任务{task_id}')


def load_subdag(parent_dag_id ,child_dag_id,args):
dag_subdag = DAG(
dag_id ='{0}。{1}'。format(parent_dag_id,child_dag_id),
default_args = args,
schedule_interval = @ daily,

和dag_subdag:
r = DummyOperator(task_id ='random')

for i in range(r.xcom_pull(task_ids = 'take_Ana',key ='the_message',dag_id = parent_dag_id)):
t = PythonOperator(
task_id ='load_subdag_ {0}'。format(i),
default_args = args,
python_callable = print_context,
op_kwargs = {'task_id':'load_subdag_ {0}'。format(i)},
dag = dag_subdag,


return dag_subdag

load_tasks = SubDagOperator(
task_id ='load_tasks',
subdag = load_subdag(dag.dag_id,
' load_tasks',args),
default_args = args,

我的代码出错

  1 |追溯(最近一次通话):
airflow_1 |在process_file
airflow_1中,文件 /usr/local/lib/python3.6/site-packages/airflow/models.py,行374 | m = imp.load_source(mod_name,filepath)
airflow_1 |在load_source
airflow_1 | |中的文件 /usr/local/lib/python3.6/imp.py,第172行。模块= _load(spec)
airflow_1 | _load
airflow_1中的文件< frozen importlib._bootstrap>,行684。 _load_unlocked
airflow_1中的文件< frozen importlib._bootstrap>,行665。 exec_module
airflow_1中的文件< frozen importlib._bootstrap_external>,行678。 _call_with_frames_removed
airflow_1中的文件< frozen importlib._bootstrap>,第219行。 < module>中的文件 /app/dags/airflow_dag_test.py,第75行。
airflow_1 | load_tasks,参数),
airflow_1 |在load_subdag
airflow_1 |文件 /app/dags/airflow_dag_test.py的第55行中|对于范围内的我(r.xcom_pull(task_ids ='take_Ana',key ='the_message',dag_id = parent_dag_id)):
airflow_1 | TypeError:xcom_pull()缺少1个必需的位置参数:'context'


解决方案

错误很简单:您缺少context 自变量10-stable / airflow / models.py#L3173 rel = nofollow noreferrer> xcom_pull() 方法。但是您真的不能只创建 context 传递给此方法;这是 Python dictionary ,它是 Airflow 传递给诸如 pre_execute()和<$ c $ b> BaseOperator 的 execute() (所有 Operator s)。



换句话说,上下文仅在<实际上会执行code> Operator ,而不是在 DAG 定义期间执行。这是有道理的,因为在 Airflow 分类法中, xcom s是< 实时中的code>任务:正在运行时互相交谈。






但是在一天结束时 Xcom s,就像其他 Airflow 模型,保存在后端元数据库中。因此,您当然可以从那里直接检索它(显然,只有过去运行过的 task s的XCOM)。虽然我没有代码片段,但是您可以查看 cli.py 在其中使用了 SQLAlchemy ORM与模型和backend-db一起玩。要知道,这意味着每当解析 DAG 定义文件时,就会向您的后端数据库触发查询,这会很快发生。






有用的链接








EDIT-1



查看您的代码段后,我感到震惊。假设 xcom_pull()返回的值会经常变化,则 dag 也将不断变化。这可能会导致不可预测的行为(您应该进行大量研究,但我对此并不满意)



我建议您重新访问整个任务工作流程,并精简为一个设计,其中
-个任务 s和$ b $的数量b-提前知道 DAG
的结构(在 dag定义文件的执行)。当然,您可以遍历 json 文件/ SQL 查询的结果(例如 SQLAlchemy 前面提到的内容)等,以生成实际的任务 s,但该文件/ db /不应经常更改的内容。 / p>




请注意,仅遍历列表以生成任务没问题;无法实现 DAG 的结构,具体取决于上游 任务的结果。例如,您不能基于上游任务在您的 DAG 中创建n个任务 em>在运行时计算n的值。






所以这是不可能的





但是这是可能的(包括您要实现的目标;即使您的操作方式似乎不是一个好主意)








EDIT-2



所以事实证明,毕竟可以从上游任务的输出生成任务。尽管需要大量有关Airflow内部工作原理和创造力的知识。




I'm trying to access at subdag creation time some xcom data from parent dag, I was searching to achieve this on internet but I didn't find something.

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )

got this error with my code

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath)
airflow_1  |   File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1  |     module = _load(spec)
airflow_1  |   File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1  |   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1  |   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1  |   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1  |     'load_tasks', args),
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1  |     for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1  | TypeError: xcom_pull() missing 1 required positional argument: 'context'

解决方案

The error is simple: you are missing the context argument required by xcom_pull() method. But you really can't just create context to pass into this method; it is a Python dictionary that Airflow passes to anchor methods like pre_execute() and execute() of BaseOperator (parent class of all Operators).

In other words, context becomes available only when Operator is actually executed, not during DAG-definition. And it makes sense because in taxanomy of Airflow, xcoms are communication mechanism between tasks in realtime: talking to each other while they are running.


But at the end of the day Xcoms, just like every other Airflow model, are persisted in backend meta-db. So of course you can directly retrieve it from there (obviously only the XCOMs of tasks that had run in the past). While I don't have a code-snippet, you can have a look at cli.py where they've used the SQLAlchemy ORM to play with models and backend-db. Do understand that this would mean a query being fired to your backend-db every time the DAG-definition file is parsed, which happens rather quickly.


Useful links


EDIT-1

After looking at your code-snippet, I got alarmed. Assuming the value returned by xcom_pull() will keep changing frequently, the number of tasks in your dag will also keep changing. This can lead to unpredictable behaviours (you should do a fair bit of research but I don't have a good feeling about it)

I'd suggest you revisit your entire task workflow and condense down to a design where the - number of tasks and - structure of DAG are known ahead of time (at the time of execution of dag-definition file). You can of-course iterate over a json file / result of a SQL query (like the SQLAlchemy thing mentioned earlier) etc. to spawn your actual tasks, but that file / db / whatever shouldn't be changing frequently.


Do understand that merely iterating over a list to generate tasks is not problematic; what's NOT possible is to have structure of your DAG dependent on result of upstream task. For example you can't have n tasks created in your DAG based on an upstream task calculating value of n at runtime.


So this is not possible

But this is possible (including what you are trying to achieve; even though the way you are doing it doesn't seem like a good idea)


EDIT-2

So as it turns out, generating tasks from output of upstream tasks is possible after all; although it requires significant amount of knowledge of internal workings of Airflow as well as a tinge of creativity.

这篇关于在气流中子标签创建时访问父dag上下文?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆