Airflow xcom pull 只返回字符串 [英] Airflow xcom pull only returns string

查看:24
本文介绍了Airflow xcom pull 只返回字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个气流管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中.我使用 CloudSqlInstanceImportOperator 导入 CSV 文件.该运算符需要一个主体,其中包含文件名和其他参数.由于我在运行时读取了该文件名,因此我还必须在运行时定义主体.这一切都有效.但是当我从 xcom 拉出正文时,它返回一个字符串而不是 python 字典.所以 CloudSqlInstanceImportOperator 给了我以下错误(我的猜测是,因为正文是字符串而不是字典):

I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV file. This operator needs a body, which contains the filename and other parameters. Since I read that filename during runtime, I also have to define the body during runtime. This all works. But when I pull the body from xcom, it returns a string instead of a python dictionary. So the CloudSqlInstanceImportOperator gives me the following error (my guess is, because the body is a string and not a dictionary):

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut
    self._validate_body_fields(
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field
    api_version=self.api_version).validate(self.body
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat
    dictionary_to_validate=body_to_validate
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel
    value = dictionary_to_validate.get(field_name
AttributeError: 'str' object has no attribute 'get

这是我使用的代码:

import json 
import os
from datetime import datetime, timedelta
import ast
from airflow import DAG
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator


def create_dag(dag_id,default_args):
    BUCKET = "{{ var.value.gp2pg_bucket }}"
    GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}"
    INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}"

    def define_import_body(file,**kwargs):
        import_body = {
            "importContext": {
                "importUser": "databasename",
                "database": "databaseuser",
                "fileType": "csv",
                "uri": "bucketname" + file,
                "csvImportOptions": {
                    "table": "schema.tablename",
                    "columns": ["columns1",
                                "column2"]}
            }
        }
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='import_body', value=import_body)
        print(import_body)

    def get_filename(var,**kwargs):
        message = ast.literal_eval(var)
        file = message[0].get('message').get('attributes').get('objectId')
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='filename', value=file)
        print(file)

    dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args)

    with dag:
        t1 = PubSubPullSensor(task_id='pull-messages',
                              project="projectname",
                              ack_messages=True,
                              max_messages=1,
                              subscription="subscribtionname")


        message = "{{ task_instance.xcom_pull() }}"

        t2 = PythonOperator(
            task_id='get_filename',
            python_callable=get_filename,
            op_kwargs={'var': message},
            provide_context=True,
        )

        file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}"

        t3 = PythonOperator(
            task_id='define_import_body',
            python_callable=define_import_body,
            op_kwargs={'file': file},
            provide_context=True,
        )

        import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}"

        t4 = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body= import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )

        t5 = GoogleCloudStorageToGoogleCloudStorageOperator(
            task_id='copy_files',
            source_bucket=BUCKET,
            source_object=file,
            destination_bucket=BUCKET,
            destination_object='processed/import/'+file, )

        t1 >> t2 >> t3 >> t4 >> t5

    return dag


dags_folder = os.getenv('DAGS_FOLDER', "./dags")
flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read()
for key, values in json.loads(flow_config).items():
    default_args = {
        "owner": "owner",
        "start_date": datetime(2020, 1, 1),
        "email": [],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    }

    dag_id = f"gp2pg_{key}_data_to_pg"

    globals()[dag_id] = create_dag(dag_id, default_args)

知道如何解决这个问题吗?

Any idea how I could solve that problem?

推荐答案

第一个 CloudSqlInstanceImportOperator已弃用.您应该使用 a> 来自供应商

First CloudSqlInstanceImportOperator is deprecated. You should use CloudSQLImportInstanceOperator from providers

body 参数需要是 dict,如 文档.

The body param needs to be dict as explained in the docs.

XCOM 是数据库中的一个表.数据保存为字符串.您不能将 dict 存储在数据库中,因为 dict 是内存对象中的 Python.您可能有一个 Json(字符串).尝试将其转换为字典:

XCOM is a table in the database. The data is saved as strings. You can't store dict in database as dict is a Python in memory object. You probably have a Json (string). Try convert it to dict:

body=json.loads(import_body) 

(在评论中讨论后)

您需要使用 PythonOperator 包装您的运算符,以便您可以将 xcom 转换为 dict 并使用它.

You will need to wrap your operator with PythonOperator so you can convert the xcom to dict and use it.

def my_func(ds, **kwargs):
    ti = kwargs['ti']
    body = ti.xcom_pull(task_ids='privious_task_id')
    import_body = json.loads(body)
    op = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body=import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )
    op.execute()
    

p = PythonOperator(task_id='python_task', python_callable=my_func)

对于气流 >= 2.1.0:Airflow 添加了将字段呈现为原生 Python 对象的功能.您需要在 DAG 构造函数中设置 render_template_as_native_obj=True.你可以按照这个 文档 示例.

For Airflow >= 2.1.0: Airflow added the ability to render fields as native Python objects. You need to set render_template_as_native_obj=True in your DAG constructor. You can follow this documentation example.

这篇关于Airflow xcom pull 只返回字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆