“似乎丢失了Dag".Cloud Composer Airflow Dynamic DAG中的错误 [英] "Dag Seems to be missing" error in a Cloud Composer Airflow Dynamic DAG
本文介绍了“似乎丢失了Dag".Cloud Composer Airflow Dynamic DAG中的错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我在Google Cloud Composer中创建了一个动态Airflow DAG,并在网络服务器中列出了该文件,然后运行(回填)没有错误.但是,存在一些问题:
I have a dynamic Airflow DAG in Google Cloud Composer gets created, listed in the web-server and ran (backfill) without error. However, there are issues:
- 当点击网址中的DAG时,它说"DAG似乎是丢失"
- 看不到Graph视图/树视图显示上面的错误
- 无法像上面显示的错误一样手动触发DAG
尝试修复此问题几天...任何提示都会有所帮助.谢谢!
Trying to fix this for couple days...any hint will be helpful. Thank you!
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from google.cloud import storage
from airflow.models import Variable
import json
args = {
'owner': 'xxx',
'start_date':'2020-11-5',
'provide_context': True
}
dag = DAG(
dag_id='dynamic',
default_args=args
)
def return_bucket_files(bucket_name='xxxxx', **kwargs):
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs()
file_list = [blob.name for blob in blobs]
return file_list
def dynamic_gcs_to_gbq_etl(file, **kwargs):
mapping = json.loads(Variable.get("xxxxx"))
database = mapping[0][file]
table = mapping[1][file]
task=GoogleCloudStorageToBigQueryOperator(
task_id= f'gcs_load_{file}_to_gbq',
bucket='xxxxxxx',
source_objects=[f'{file}'],
destination_project_dataset_table=f'xxx.{database}.{table}',
write_disposition="WRITE_TRUNCATE",
autodetect=True,
skip_leading_rows=1,
source_format='CSV',
dag=dag)
return task
start_task = DummyOperator(
task_id='start',
dag=dag
)
end_task = DummyOperator(
task_id='end',
dag=dag)
push_bucket_files = PythonOperator(
task_id="return_bucket_files",
provide_context=True,
python_callable=return_bucket_files,
dag=dag)
for file in return_bucket_files():
gcs_load_task = dynamic_gcs_to_gbq_etl(file)
start_task >> push_bucket_files >> gcs_load_task >> end_task
推荐答案
此问题意味着Web服务器无法从侧面填充DAG包-此问题很可能不是您的DAG特有的.
This issue means that the Web Server is failing to fill in the DAG bag on its side - this problem is most likely not with your DAG specifically.
My suggestion would be right now to try and restart the web server (via the installation of some dummy package).