“似乎丢失了Dag".Cloud Composer Airflow Dynamic DAG中的错误 [英] "Dag Seems to be missing" error in a Cloud Composer Airflow Dynamic DAG

查看:93
本文介绍了“似乎丢失了Dag".Cloud Composer Airflow Dynamic DAG中的错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Google Cloud Composer中创建了一个动态Airflow DAG,并在网络服务器中列出了该文件,然后运行(回填)没有错误.但是,存在一些问题:

I have a dynamic Airflow DAG in Google Cloud Composer gets created, listed in the web-server and ran (backfill) without error. However, there are issues:

  1. 当点击网址中的DAG时,它说"DAG似乎是丢失"
  2. 看不到Graph视图/树视图显示上面的错误
  3. 无法像上面显示的错误一样手动触发DAG

尝试修复此问题几天...任何提示都会有所帮助.谢谢!

Trying to fix this for couple days...any hint will be helpful. Thank you!

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from google.cloud import storage
from airflow.models import Variable
import json


args = {
     'owner': 'xxx',
     'start_date':'2020-11-5',
     'provide_context': True
    }


dag = DAG(
    dag_id='dynamic',
    default_args=args
    )


def return_bucket_files(bucket_name='xxxxx', **kwargs):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_list = [blob.name for blob in blobs]

    return file_list


def dynamic_gcs_to_gbq_etl(file, **kwargs):

    mapping = json.loads(Variable.get("xxxxx"))
    database = mapping[0][file]
    table = mapping[1][file]

    task=GoogleCloudStorageToBigQueryOperator(
        task_id= f'gcs_load_{file}_to_gbq',
        bucket='xxxxxxx',
        source_objects=[f'{file}'],
        destination_project_dataset_table=f'xxx.{database}.{table}',
        write_disposition="WRITE_TRUNCATE",
        autodetect=True,
        skip_leading_rows=1,
        source_format='CSV',
        dag=dag)

    return task


start_task = DummyOperator(
    task_id='start',
    dag=dag
)


end_task = DummyOperator(
    task_id='end',
    dag=dag)


push_bucket_files = PythonOperator(
        task_id="return_bucket_files",
        provide_context=True,
        python_callable=return_bucket_files,
        dag=dag)


for file in return_bucket_files():
    gcs_load_task = dynamic_gcs_to_gbq_etl(file)
    start_task >> push_bucket_files >> gcs_load_task >> end_task

推荐答案

此问题意味着Web服务器无法从侧面填充DAG包-此问题很可能不是您的DAG特有的.

This issue means that the Web Server is failing to fill in the DAG bag on its side - this problem is most likely not with your DAG specifically.

我的建议是立即尝试重新启动Web服务器(通过

My suggestion would be right now to try and restart the web server (via the installation of some dummy package).

帖子 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆