如何在气流中使用 PythonVirtualenvOperator? [英] How to use PythonVirtualenvOperator in airflow?

查看:45
本文介绍了如何在气流中使用 PythonVirtualenvOperator?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基本上我正在使用气流并开发了一个任务,我从外部源下载文件.

Basically I'm working with airflow and developed a task that my download a file from an external source.

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)

并且此气流在虚拟环境 (pipenv) 中运行.

and this airflow is running in a virtual environment (pipenv).

下载功能为:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name

所以基本上我使用 Xcons 将数据从一个任务传递到另一个任务......使用这种配置不可能管理每个 DAG 的所有依赖项......

so basically I'm using Xcons to pass data from one task to another...and using this configurations it's impossible to manage all of dependencies of each DAG...

在文档中我发现这个类叫做PythonVirtualenvOperator",所以为了实现我写的:

In the documentation I found this class called "PythonVirtualenvOperator", so to implement that I wrote :

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )

它给了我以下错误:

TypeError: can't pickle module objects

download_file 函数是另一个文件中的 API 连接.

the download_file function it's an API connection that is in another file.

有什么建议我如何管理环境并在任务之间建立联系?

any suggestion how can I manage the environment and have connection between tasks?

推荐答案

问题是

provide_context=真,

provide_context=True,

Airflow 无法对上下文进行pickle,因为其中包含所有不可序列化的内容.如果你只需要像 execution_ts 这样的简单东西,你可以使用模板和 op_kwargs 来解决这个问题:

Airflow cannot pickle the context because of all the unserializable stuff in it. You can use templating and op_kwargs to work around this if you only need simple stuff like execution_ts:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        provide_context=False,
        op_kwargs={
          execution_date_str = '{{ execution_date }}',
        },
        dag=dag)

当然,您需要更新可调用对象的参数.我没有比这更深入,因为它适用于我的用例.

Of course, you will need to update the arguments to your callable. I didn't go any deeper than that because it worked for my use case.

这篇关于如何在气流中使用 PythonVirtualenvOperator?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆