Airflow xcom pull 只返回字符串 [英] Airflow xcom pull only returns string
问题描述
我有一个气流管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中.我使用 CloudSqlInstanceImportOperator 导入 CSV 文件.该运算符需要一个主体,其中包含文件名和其他参数.由于我在运行时读取了该文件名,因此我还必须在运行时定义主体.这一切都有效.但是当我从 xcom 拉出正文时,它返回一个字符串而不是 python 字典.所以 CloudSqlInstanceImportOperator 给了我以下错误(我的猜测是,因为正文是字符串而不是字典):
I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV file. This operator needs a body, which contains the filename and other parameters. Since I read that filename during runtime, I also have to define the body during runtime. This all works. But when I pull the body from xcom, it returns a string instead of a python dictionary. So the CloudSqlInstanceImportOperator gives me the following error (my guess is, because the body is a string and not a dictionary):
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut
self._validate_body_fields(
File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field
api_version=self.api_version).validate(self.body
File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat
dictionary_to_validate=body_to_validate
File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel
value = dictionary_to_validate.get(field_name
AttributeError: 'str' object has no attribute 'get
这是我使用的代码:
import json
import os
from datetime import datetime, timedelta
import ast
from airflow import DAG
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator
def create_dag(dag_id,default_args):
BUCKET = "{{ var.value.gp2pg_bucket }}"
GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}"
INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}"
def define_import_body(file,**kwargs):
import_body = {
"importContext": {
"importUser": "databasename",
"database": "databaseuser",
"fileType": "csv",
"uri": "bucketname" + file,
"csvImportOptions": {
"table": "schema.tablename",
"columns": ["columns1",
"column2"]}
}
}
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='import_body', value=import_body)
print(import_body)
def get_filename(var,**kwargs):
message = ast.literal_eval(var)
file = message[0].get('message').get('attributes').get('objectId')
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='filename', value=file)
print(file)
dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args)
with dag:
t1 = PubSubPullSensor(task_id='pull-messages',
project="projectname",
ack_messages=True,
max_messages=1,
subscription="subscribtionname")
message = "{{ task_instance.xcom_pull() }}"
t2 = PythonOperator(
task_id='get_filename',
python_callable=get_filename,
op_kwargs={'var': message},
provide_context=True,
)
file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}"
t3 = PythonOperator(
task_id='define_import_body',
python_callable=define_import_body,
op_kwargs={'file': file},
provide_context=True,
)
import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}"
t4 = CloudSqlInstanceImportOperator(
project_id=GCP_PROJECT_ID,
body= import_body,
instance=INSTANCE_NAME,
gcp_conn_id='postgres_default',
task_id='sql_import_task',
validate_body=True,
)
t5 = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='copy_files',
source_bucket=BUCKET,
source_object=file,
destination_bucket=BUCKET,
destination_object='processed/import/'+file, )
t1 >> t2 >> t3 >> t4 >> t5
return dag
dags_folder = os.getenv('DAGS_FOLDER', "./dags")
flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read()
for key, values in json.loads(flow_config).items():
default_args = {
"owner": "owner",
"start_date": datetime(2020, 1, 1),
"email": [],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
dag_id = f"gp2pg_{key}_data_to_pg"
globals()[dag_id] = create_dag(dag_id, default_args)
知道如何解决这个问题吗?
Any idea how I could solve that problem?
推荐答案
第一个 CloudSqlInstanceImportOperator
是 已弃用.您应该使用 a> 来自供应商
First CloudSqlInstanceImportOperator
is deprecated. You should use CloudSQLImportInstanceOperator from providers
body
参数需要是 dict,如 文档.
The body
param needs to be dict as explained in the docs.
XCOM 是数据库中的一个表.数据保存为字符串.您不能将 dict 存储在数据库中,因为 dict 是内存对象中的 Python.您可能有一个 Json(字符串).尝试将其转换为字典:
XCOM is a table in the database. The data is saved as strings. You can't store dict in database as dict is a Python in memory object. You probably have a Json (string). Try convert it to dict:
body=json.loads(import_body)
(在评论中讨论后)
您需要使用 PythonOperator 包装您的运算符,以便您可以将 xcom
转换为 dict 并使用它.
You will need to wrap your operator with PythonOperator so you can convert the xcom
to dict and use it.
def my_func(ds, **kwargs):
ti = kwargs['ti']
body = ti.xcom_pull(task_ids='privious_task_id')
import_body = json.loads(body)
op = CloudSqlInstanceImportOperator(
project_id=GCP_PROJECT_ID,
body=import_body,
instance=INSTANCE_NAME,
gcp_conn_id='postgres_default',
task_id='sql_import_task',
validate_body=True,
)
op.execute()
p = PythonOperator(task_id='python_task', python_callable=my_func)
对于气流 >= 2.1.0:Airflow 添加了将字段呈现为原生 Python 对象的功能.您需要在 DAG 构造函数中设置 render_template_as_native_obj=True
.你可以按照这个 文档 示例.
For Airflow >= 2.1.0:
Airflow added the ability to render fields as native Python objects.
You need to set render_template_as_native_obj=True
in your DAG constructor. You can follow this documentation example.
这篇关于Airflow xcom pull 只返回字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!