“传递的参数无效”对于使用气流将mysql数据加载到bigquery的dag错误 [英] "Invalid arguments passed" error for dag that loads mysql data to bigquery using airflow

查看:596
本文介绍了“传递的参数无效”对于使用气流将mysql数据加载到bigquery的dag错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我运行一个提取MySQL数据并将其加载到BigQuery中的DAG。我正在正确地得到以下错误:


/usr/local/lib/python2.7/dist-packages/airflow/models.py :1927年:PendingDeprecationWarning:传递给MySqlToGoogleCloudStorageOperator的参数无效。支持传递这样的参数将被丢弃在Airflow 2.0中。无效的参数是:

* args:()



** kwargs:{'google_cloud_storage_connn_id':'podioGCPConnection '} category = PendingDeprecationWarning

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数传递给GoogleCloudStorageToBigQueryOperator。支持传递这样的参数将被丢弃在Airflow 2.0中。无效参数为:

* args:()



** kwargs:{'project_id':'podio -data'} category = PendingDeprecationWarning


dag的代码在这里:

  my_connections = [
'podiotestmySQL'
]

my_tables = [
'logistics_orders',
'logistics_waybills',
'logistics_shipping_lines',
'logistics_info_requests'
]
$ b default_args = {
'owner':'tia',
'start_date':datetime(2018,1,2),
'depends_on_past':False,
'retries':1,
'retry_delay':timedelta(minutes = 5),
}

dag = DAG('etl',default_args = default_args,schedule_interval = timedelta(days = 1))

slack_notify = SlackAPIPostOperator(
task_id ='slack_notfiy',
token ='xxxxxx',
channel ='data-status',
username ='a irflow',
text ='成功执行podio ETL操作',
dag = dag)

用于连接my_connections:
用于my_tables中的表:
extract = MySqlToGoogleCloudStorageOperator(
task_id =extract_mysql_%s_%s%(connection,table),
mysql_conn_id = connection,
google_cloud_storage_connn_id ='podioGCPConnection',
sql = SELECT *,'%s'作为源podiodb。%s%(connection,table),
bucket ='podio-reader-storage',
filename ='%s /%s /% (连接,表),
schema_filename ='%s / schemas /%s.json'%(connection,table),
dag = dag)

load = GoogleCloudStorageToBigQueryOperator(
task_id =load_bg_%s_%s%(connection,table),
bigquery_conn_id ='podioGCPConnection',
google_cloud_storage_conn_id ='podioGCPConnection',
bucket ='podio-reader-storage',
destination_project_dataset_table =Podio_Data1。%s /%s%(connection,table),
source_objects = [%s /%s /%s *。 json%(connection,table,table)],
schema_object =%s / schemas /%s.json%(connection,table),
source_format ='NEWLINE_DELIMITED_JSON',
create_disposition ='CREATE_IF_NEEDED',
write_disposition ='WRITE_TRUNCATE',
project_id ='podio-data',
dag = dag)

load.set_upstream(extract)
slack_notify.set_upstream(load)


解决方案

来源:


I running a DAG that extracts MySQL data and loads it to BigQuery in airflow. I am currectly getting the following error:

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlToGoogleCloudStorageOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:

*args: ()

**kwargs: {'google_cloud_storage_connn_id': 'podioGCPConnection'} category=PendingDeprecationWarning

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to GoogleCloudStorageToBigQueryOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:

*args: ()

**kwargs: {'project_id': 'podio-data'} category=PendingDeprecationWarning

The code for the dag is here:

my_connections = [
    'podiotestmySQL'
]

my_tables = [
    'logistics_orders',
    'logistics_waybills',
    'logistics_shipping_lines',
    'logistics_info_requests'
]

default_args = {
    'owner' : 'tia',
    'start_date' : datetime(2018, 1, 2),
    'depends_on_past' : False,
    'retries' : 1,
    'retry_delay':timedelta(minutes=5),
}

dag = DAG('etl', default_args=default_args,schedule_interval=timedelta(days=1))

slack_notify = SlackAPIPostOperator (
    task_id = 'slack_notfiy',
    token = 'xxxxxx',
    channel='data-status',
    username = 'airflow',
    text = 'Successfully performed podio ETL operation',
    dag=dag)

for connection in my_connections:
    for table in my_tables: 
        extract = MySqlToGoogleCloudStorageOperator(
           task_id="extract_mysql_%s_%s"%(connection,table),
           mysql_conn_id = connection,
           google_cloud_storage_connn_id = 'podioGCPConnection',
           sql = "SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
           bucket='podio-reader-storage',
           filename= '%s/%s/%s{}.json'%(connection,table,table),
           schema_filename='%s/schemas/%s.json'%(connection,table),
           dag=dag)

       load =GoogleCloudStorageToBigQueryOperator(
           task_id = "load_bg_%s_%s"%(connection,table),
           bigquery_conn_id = 'podioGCPConnection',
           google_cloud_storage_conn_id = 'podioGCPConnection',
           bucket = 'podio-reader-storage',
           destination_project_dataset_table = "Podio_Data1.%s/%s"%(connection,table),
           source_objects = ["%s/%s/%s*.json"%(connection,table,table)],
           schema_object = "%s/schemas/%s.json"%(connection,table),
           source_format = 'NEWLINE_DELIMITED_JSON',
           create_disposition = 'CREATE_IF_NEEDED',
           write_disposition = 'WRITE_TRUNCATE',
           project_id = 'podio-data',
           dag=dag)

      load.set_upstream(extract)
      slack_notify.set_upstream(load)

解决方案

Reading the source here: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_bq.py

Please remove these parameters from the default arguments:

google_cloud_storage_connn_id = 'podioGCPConnection'
project_id = 'podio-data',

You need to create a connection in the Airflow dashboard.

这篇关于“传递的参数无效”对于使用气流将mysql数据加载到bigquery的dag错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆