传递字符串列表作为Airflow中相关任务的参数 [英] Pass a list of strings as parameter of a dependant task in Airflow

查看:79
本文介绍了传递字符串列表作为Airflow中相关任务的参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试通过 XCom ,但我似乎无法设法将推送列表解释为列表。



例如,当我在某些函数中执行此操作时<在 ShortCircuitOperator 中运行的code> blah :

  paths = ['gs:// {} / {}'.format(bucket,obj)for my_list中的obj] 
kwargs ['ti']。xcom_push(key ='return_value ',value = full_paths)

然后我想使用此类列表作为运算符的参数。例如,

  run_task_after_blah = AfterBlahOperator(
task_id ='run-task-after-blah',
...,
input_paths = {{ti.xcom_pull(task_ids ='find-paths')}},
...,

我希望 input_paths 等于 paths ,但这不是因为渲染先进行然后分配,并且在某种程度上模板渲染将 xcom_pull 返回为 stringized 列表(此后,我的 AfterBlahOperator 插入将其分配为JSON中元素的值。



paths 连接到一个由一些分隔符分隔的字符串中,并将其推入XCom,然后在从XCom提取时将其拆分回去,但是随着XCom被首先渲染,我得到了,在模板中调用 split 函数时,该 stringified 列表,或路径,如果 split 函数应用于参数(如 {{ti.xcom_pull(task_ids =' find-paths')}}。split(';')



XCom对于单个值作为任务参数或当提取的值可以进一步处理但不能将multi_values转换为任务参数时将其转换为 one时。



有没有办法做到这一点写一个额外的函数来精确返回这样的字符串列表?
也许我滥用XCom过多,但是Airflow中有很多操作员都将元素列表作为参数(例如,通常是先前任务完成的多个文件的完整路径,因此未知)

解决方案

Jinja呈现字符串,因此,如果您通过模板获取XCom,则它将始终是字符串。相反,您将需要获取可以访问 TaskInstance 对象的XCom。像这样的东西:

  class AfterBlahOperator(BaseOperator):

def __init __(self,... ,input_task_id,* args,** kwargs):
...
self.input_task_id = input_task_id
super(AfterBlahOperator,self).__ init __(* args,** kwargs)

def execute(self,context):
input_paths = context ['ti']。xcom_pull(task_ids = self.input_task_id)
for input_paths中的路径:
...

这与您在 PythonOperator中获取它的方式类似,其中 XCom文档提供了一个示例。 p>

请注意,您仍可以支持单独的 input_paths 参数,以决定何时可以在DAG中对其进行硬编码,需要额外检查以查看从哪个参数读取值。


I am trying to pass a list of strings from one task to another one via XCom but I do not seem to manage to get the pushed list interpreted back as a list.

For example, when I do this in some function blah that is run in a ShortCircuitOperator:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

and then I want to use such list as a parameter of an operator. For example,

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)

I expect input_paths to be equal to paths but it does not because the rendering happens firs and then assignment, and somewhat the template rendering converts the xcom_pull return to a stringified list (and thereafter my AfterBlahOperator inserts assigns that as the value of an element in a JSON.

I tried concatenating the paths into one string separated by some separator and pushing that to the XCom and then splitting that back when pulling from the XCom but as the XCom gets rendered first, I get, either that stringified list when the split function is called inside the template or the original concatenated string of paths if the split function is applied to the parameter (as in "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';').

XCom seems to work great for single values as task parameters or multiple values when the extracted values can be further processed but not for multiple_values to convert into 'one' as parameter of a task.

Is there a way to do this without having to write an extra function that precisely returns such list of strings? Or maybe I am abusing XCom too much, but there are many operators in Airflow that take a list of elements as parameter (e.g., usually the full path to multiple files that are the result of some previous task, hence not known beforehand).

解决方案

Jinja renders strings, so if you fetch an XCom via templates, it's always going to be a string. Instead, you will need to fetch the XCom where you have access to the TaskInstance object. Something like this:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...

This is similar to how you would fetch it within a PythonOperator, which the XCom docs provide an example of.

Note that you can still support a separate input_paths parameter for when it can be hardcoded in a DAG, you'll just need an extra check to see which param to read the value from.

这篇关于传递字符串列表作为Airflow中相关任务的参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆