传递字符串列表作为Airflow中相关任务的参数 [英] Pass a list of strings as parameter of a dependant task in Airflow
问题描述
我正尝试通过 XCom ,但我似乎无法设法将推送列表解释为列表。
例如,当我在某些函数中执行此操作时<在 ShortCircuitOperator
中运行的code> blah :
paths = ['gs:// {} / {}'.format(bucket,obj)for my_list中的obj]
kwargs ['ti']。xcom_push(key ='return_value ',value = full_paths)
然后我想使用此类列表作为运算符的参数。例如,
run_task_after_blah = AfterBlahOperator(
task_id ='run-task-after-blah',
...,
input_paths = {{ti.xcom_pull(task_ids ='find-paths')}},
...,
)
我希望 input_paths
等于 paths
,但这不是因为渲染先进行然后分配,并且在某种程度上模板渲染将 xcom_pull
返回为 stringized 列表(此后,我的 AfterBlahOperator
插入将其分配为JSON中元素的值。
将 paths
连接到一个由一些分隔符分隔的字符串中,并将其推入XCom,然后在从XCom提取时将其拆分回去,但是随着XCom被首先渲染,我得到了,在模板中调用 split
函数时,该 stringified 列表,或路径
,如果 split
函数应用于参数(如 {{ti.xcom_pull(task_ids =' find-paths')}}。split(';')
。
XCom对于单个值作为任务参数或当提取的值可以进一步处理但不能将multi_values转换为任务参数时将其转换为 one时。
有没有办法做到这一点写一个额外的函数来精确返回这样的字符串列表?
也许我滥用XCom过多,但是Airflow中有很多操作员都将元素列表作为参数(例如,通常是先前任务完成的多个文件的完整路径,因此未知)
Jinja呈现字符串,因此,如果您通过模板获取XCom,则它将始终是字符串。相反,您将需要获取可以访问 TaskInstance
对象的XCom。像这样的东西:
class AfterBlahOperator(BaseOperator):
def __init __(self,... ,input_task_id,* args,** kwargs):
...
self.input_task_id = input_task_id
super(AfterBlahOperator,self).__ init __(* args,** kwargs)
def execute(self,context):
input_paths = context ['ti']。xcom_pull(task_ids = self.input_task_id)
for input_paths中的路径:
...
这与您在 PythonOperator中获取它的方式类似
,其中 XCom文档提供了一个示例。 p>
请注意,您仍可以支持单独的 input_paths
参数,以决定何时可以在DAG中对其进行硬编码,需要额外检查以查看从哪个参数读取值。
I am trying to pass a list of strings from one task to another one via XCom but I do not seem to manage to get the pushed list interpreted back as a list.
For example, when I do this in some function blah
that is run in a ShortCircuitOperator
:
paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)
and then I want to use such list as a parameter of an operator. For example,
run_task_after_blah = AfterBlahOperator(
task_id='run-task-after-blah',
...,
input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
...,
)
I expect input_paths
to be equal to paths
but it does not because the rendering happens firs and then assignment, and somewhat the template rendering converts the xcom_pull
return to a stringified list (and thereafter my AfterBlahOperator
inserts assigns that as the value of an element in a JSON.
I tried concatenating the paths
into one string separated by some separator and pushing that to the XCom and then splitting that back when pulling from the XCom but as the XCom gets rendered first, I get, either that stringified list when the split
function is called inside the template or the original concatenated string of paths
if the split
function is applied to the parameter (as in "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')
.
XCom seems to work great for single values as task parameters or multiple values when the extracted values can be further processed but not for multiple_values to convert into 'one' as parameter of a task.
Is there a way to do this without having to write an extra function that precisely returns such list of strings? Or maybe I am abusing XCom too much, but there are many operators in Airflow that take a list of elements as parameter (e.g., usually the full path to multiple files that are the result of some previous task, hence not known beforehand).
Jinja renders strings, so if you fetch an XCom via templates, it's always going to be a string. Instead, you will need to fetch the XCom where you have access to the TaskInstance
object. Something like this:
class AfterBlahOperator(BaseOperator):
def __init__(self, ..., input_task_id, *args, **kwargs):
...
self.input_task_id = input_task_id
super(AfterBlahOperator, self).__init__(*args, **kwargs)
def execute(self, context):
input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
for path in input_paths:
...
This is similar to how you would fetch it within a PythonOperator
, which the XCom docs provide an example of.
Note that you can still support a separate input_paths
parameter for when it can be hardcoded in a DAG, you'll just need an extra check to see which param to read the value from.
这篇关于传递字符串列表作为Airflow中相关任务的参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!