如何在 Airflow 2.x 中将 XComArg 转换为字符串值? [英] How to convert XComArg to string values in Airflow 2.x?
本文介绍了如何在 Airflow 2.x 中将 XComArg 转换为字符串值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
代码:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class GCSUploadOperator(BaseOperator):
@apply_defaults
def __init__(
self,
bucket_name,
target_file_name,
data_as_str,
gcp_conn_id="google_cloud_default",
*args,
**kwargs,
):
super(GCSUploadOperator, self).__init__(*args, **kwargs)
self.bucket_name = bucket_name
self.data_as_str = data_as_str
self.gcp_conn_id = gcp_conn_id
self.target_file_name = target_file_name
def execute(self, context):
hook = GCSHook(self.gcp_conn_id)
hook.upload(
bucket_name=self.bucket_name,
object_name=context["execution_date"].strftime(
f"year=2022/month=%m/day=%d/{self.target_file_name}"
),
data=self.data_as_str,
)
numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(
task_id="upload_content_to_GCS",
bucket_name=BUCKET_NAME,
target_file_name=f"{STORE_KEY_CONTENT}.json",
data_as_str=?????????, # I need to pass a string result of previous task
)
我对 data_as_str
的尝试:
gcs = GCSUploadOperator(
task_id="upload_content_to_GCS",
bucket_name=BUCKET_NAME,
target_file_name=f"{STORE_KEY_CONTENT}.json",
data_as_str=numbers
)
--> TypeError: <Task(PythonOperator): numbers> could not be converted to bytes
gcs = GCSUploadOperator(
task_id="upload_content_to_GCS",
bucket_name=BUCKET_NAME,
target_file_name=f"{STORE_KEY_CONTENT}.json",
data_as_str=numbers.output
)
--> TypeError: <airflow.models.xcom_arg.XComArg object at 0x7f6e8ed76760> could not be converted to bytes
有什么想法吗?
推荐答案
要使其工作,您必须将 Operator
中期望的字段定义为 template_field
.我做了这个工作示例:
To make it work, you have to define the field you are expecting in your Operator
as a template_field
. I made this working example:
class CustomDummyOperator(BaseOperator):
template_fields = ('msg_from_previous_task',)
@apply_defaults
def __init__(self,
msg_from_previous_task,
*args, **kwargs) -> None:
super(CustomDummyOperator, self).__init__(*args, **kwargs)
self.msg_from_previous_task = msg_from_previous_task
def execute(self, context):
print(f"Message: {self.msg_from_previous_task}")
DAG:
dag = DAG(
'xcom_arg_custom_op',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
catchup=False
)
def return_a_str():
return "string_value_from_op1"
task_1 = PythonOperator(
task_id='task_1',
dag=dag,
python_callable=return_a_str,
)
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task=task_1.output
)
task_1 >> task_2
输出日志:
[2021-05-25 13:51:50,848] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_arg_custom_op
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-05-23T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-05-23T00:00:00+00:00
Message: string_value_from_op1
在幕后,我们正在使用 str() XComArg
的方法,它为常规(none-任务流") 操作员.
Under the hood we are using the str() method of XComArg
which provides backward compatibility for regular ("none-Taskflow") Operators.
如果这对您有用,请告诉我!
Let me know if that worked for you!
这篇关于如何在 Airflow 2.x 中将 XComArg 转换为字符串值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文