如何在同一DAG运行中(不是最新的)从其他任务实例中提取xcom值? [英] How to pull xcom value from other task instance in the same DAG run (not the most recent one)?
问题描述
我有3次DAG运行:
- DAGR 1在2019-02-13 16:00:00执行
- DAGR 2于2019-02-13 17:00:00执行
- DAGR 3于2019-02-13 18:00:00 $执行b $ b
- DAGR 1 executed at 2019-02-13 16:00:00
- DAGR 2 executed at 2019-02-13 17:00:00
- DAGR 3 executed at 2019-02-13 18:00:00
在任务实例中, DAGR 1 $ c $的
X
c>我想获取任务实例 Y
的xcom值。我是这样做的:
In a task instance X
of DAGR 1
I want to get xcom value of task instance Y
. I did this:
kwargs['task_instance'].xcom_pull(task_ids='Y')
我期望从<$ c中的任务实例 Y
获取xcom的值$ c> DAGR 1 。相反,我是从 DAGR 3
中获得的。
I expected to get value of xcom from task instance Y
in DAGR 1
. Instead I got from DAGR 3
.
来自Airflow文档
From Airflow documentation
如果为
task_ids
传递了单个字符串,则为xcom_pull
返回该任务的XCom值; ...
If
xcom_pull
is passed a single string fortask_ids
, then the most recent XCom value from that task is returned; ...
- 为什么气流
xcom_pull
返回最新的xcom值? - 如果我想从同一DAG运行中提取怎么办?
- Why Airflow
xcom_pull
return the most recent xcom value? - What if I want to pull from the same DAG run?
推荐答案
这回答了您的问题[如何在同一DAG运行(不是最新的)中从其他任务实例中获取xcom值? ]
参见下面的示例:
This asnwers your question [How to pull xcom value from other task instance in the same DAG run (not the most recent one)? ]
See the example below :
t1 = SomeOperator(
task_id='Your_t1_Task_ID',
xcom_push = True,
...
...
dag=dag)
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
string_to_print = 'Value in xcom is: {}'.format(xcom)
#string_to_print holds that value, you can also print it in the logs
logging.info(string_to_print)
t2 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_records,
dag=dag)
t1 >> t2
这篇关于如何在同一DAG运行中(不是最新的)从其他任务实例中提取xcom值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!