气流:在下一个任务中获取上一个任务 ID [英] Airflow: Get previous task id in the next task
问题描述
我有 2 个任务.在第一个中,python 运算符计算一些东西,在第二个中,我想在 Http 运算符中使用 python 运算符的输出.这是我的代码:
I have 2 tasks. In the first, python operator computes something and in the second I want to use the output of the python operator in the Http operator. Here is my code:
source_list = ['account', 'sales']
for source_type in source_list:
t2 = PythonOperator(
task_id='compute_next_gather_time_for_' + source_type,
python_callable=compute_next_gather_time,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
op_args=[source_type],
retries=3
)
t3 = SimpleHttpOperator(
task_id='request_' + source_type + '_report',
method='POST',
http_conn_id='abc',
endpoint=endpoint,
data=json.dumps({
"query": {
"start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
"stop": str(yesterday),
"fields": [
1
]
}
}),
headers={"Content-Type": "application/json", "Authorization": 'abc'},
response_check=lambda response: True if len(response.json()) == 0 else False,
log_response=True,
retries=3
)
查询:我想在其数据变量中传递 t3 中的先前任务 ID.我不知道该怎么做,因为 t2 任务 id 不是恒定的.它随着 source_type 的变化而变化.显然,当我尝试它时并没有呈现它.
Query: I want to pass previous task id in t3 in its data variable. I am not sure how to do that since t2 task id is not constant. It changes with changing source_type. Evidently, when I tried it did not render it.
推荐答案
我之前没有在我的任何 DAG 中使用过 Jinja 模板,但是我遇到了类似的问题,我需要从特定的对象中检索 XCOM 值具有动态生成的 task_id 的任务.
I haven't used Jinja templating in any of my DAGs before, but I have been faced with similar problems where I was needing to retrieve XCOM values from a particular task that has a dynamically generated task_id.
您可以按照在 T2 中定义 task_id
的相同方式在 T3 中定义 task_ids
.例如:
You could define the task_ids
in T3 in the same way you defined the task_id
in T2. For example:
source_list = ['account', 'sales']
for source_type in source_list:
task_id='compute_next_gather_time_for_' + source_type
t2 = PythonOperator(
task_id=task_id,
python_callable=compute_next_gather_time,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
op_args=[source_type],
retries=3
)
t3 = SimpleHttpOperator(
task_id='request_' + source_type + '_report',
method='POST',
http_conn_id='abc',
endpoint=endpoint,
data=json.dumps({
"query": {
"start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
"stop": str(yesterday),
"fields": [
1
]
}
}),
headers={"Content-Type": "application/json", "Authorization": 'abc'},
response_check=lambda response: True if len(response.json()) == 0 else False,
log_response=True,
retries=3
)
这篇关于气流:在下一个任务中获取上一个任务 ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!