在气流中发出数据库请求时遇到的问题 [英] Problems in making database requests in airflow

查看:91
本文介绍了在气流中发出数据库请求时遇到的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试根据数据库调用的响应动态创建任务。但是当我这样做时,运行选项只是不出现在Airflow中,所以我无法运行。



这里是代码:

  tables = ['a','b','c'] //有效
#tables = get_tables()//无效

check_x = python_operator.PythonOperator(task_id = verify_loaded,
python_callable = lambda:verify_loaded(tables)

bridge = DummyOperator(
task_id = 'bridge'


check_x>>在表中为vname桥接


sql =( SELECT * FROM`asd.temp。{table}`LIMIT 5 .format(table = vname))

log.info(vname)
materialize__bq = BigQueryOperator(sql = sql,
destination_dataset_table = asd.temp。 + table_prefix + vname,
task_id = materialize_ + vname,
bigquery_conn_id = bigquery_default,
google_cloud_storage_conn_id = google_cloud_default,
use_legacy_sql = False,
write_disposition = WRITE_TRUNCATE,
create_disposition = CREATE_IF_NEEDED,
query_params = {},
allow_large_results =真


桥>> materialize__bq


def get_tables():

bq_hook = BigQueryHook(bigquery_conn_id ='bigquery_default',委托_ ==无,use_legacy_sql = False)
my_query = ( SELECT table_id FROM`{project}。{dataset}。{table}`LIMIT 3;。format(
project = project,dataset = dataset,table ='__ TABLES__'))

df = bq_hook.get_pandas_df(sql = my_query,方言='标准')
返回视图名称

我正在尝试使评论的部分正常工作,但是没有办法。 get_tables()函数从bigquery获取表名,我想使它以这种方式动态地工作。当我这样做时,我没有运行选项,并且看起来像dag损坏了。有什么帮助吗?尝试了很长时间。



这里是屏幕截图:



解决方案

要了解该问题,我们必须检查Composer体系结构





调度程序使用创建Composer实例时配置的服务帐户在GKE中运行



Web UI使用其他服务帐户在App Engine中的租户项目中运行。该租户项目的资源是隐藏的(在项目资源中看不到App Engine应用程序,Cloud SQL实例或服务帐户)



用户界面会解析DAG文件,并尝试使用 bigquery_default连接访问BigQuery。
检查气流GCP _get_credentials 源代码



> https://github.com/apache/airflow/blob/1.10.2/airflow/ contrib / hooks / gcp_api_base_hook.py#L74



如果您尚未在airflow admin中配置连接,它将使用 google .auth.default 方法,用于使用租户项目服务帐户连接到BigQuery。此服务帐户没有访问BigQuery的权限,将会收到未经授权的错误,并且无法在用户界面中生成DAG。也许,如果您签入Stackdriver,就会发现BigQuery错误。



另一方面,气流调度程序会使用Composer创建中使用的服务帐户,权限并正确解析DAG



如果您在本地气流实例中执行代码,则由于Web UI和Scheduler使用相同的服务帐户,因此可以按预期运行两种情况



最简单的解决方案是向bigquery_default连接中添加密钥文件路径或密钥文件JSON,以避免在Web UI中使用默认服务帐户



如果您对此解决方案有任何安全问题(服务帐户凭据将对有权访问Composer的任何人都可用),则另一种选择是重组代码以在PythonOperator中执行所有代码。此PythonOperator将调用get_table,然后循环执行BigQuery命令(使用BigQueryHook而不是BigQueryOperator)。此解决方案的问题是您将只有一个任务,而不是每个表都有一个任务


I am trying to create tasks dynamically based on response of a database call. But when I do this the run option just don't come in Airflow, so I cant run.

Here s the code:

tables =  ['a','b','c'] // This works
#tables =  get_tables() // This never works

check_x = python_operator.PythonOperator(task_id="verify_loaded",
                                             python_callable = lambda: verify_loaded(tables)
                                             ) 
bridge = DummyOperator(
    task_id='bridge'
)

check_x >> bridge

for vname in tables:
    sql = ("SELECT * FROM `asd.temp.{table}` LIMIT 5".format(table= vname ))

    log.info(vname)
    materialize__bq = BigQueryOperator( sql=sql,
                                            destination_dataset_table="asd.temp." + table_prefix + vname,
                                            task_id = "materialize_" + vname,
                                            bigquery_conn_id = "bigquery_default",
                                            google_cloud_storage_conn_id="google_cloud_default",
                                            use_legacy_sql = False,
                                            write_disposition = "WRITE_TRUNCATE",
                                            create_disposition = "CREATE_IF_NEEDED",
                                            query_params = {},
                                            allow_large_results = True
                                          )

    bridge >> materialize__bq


 def get_tables(): 

    bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
    my_query = ("SELECT table_id FROM `{project}.{dataset}.{table}` LIMIT 3;".format(
    project=project, dataset=dataset, table='__TABLES__'))

    df = bq_hook.get_pandas_df(sql=my_query, dialect='standard')
    return view_names

I am trying to make the commented part work but no way. The get_tables() function fetches tablenames from bigquery and I wanted to make it work dynamically this way. When I do this, I dont get the option to run AND IT LOOKS LIKE dag IS broken. Any help? Trying for a long time.

Here is screenshot:

解决方案

To understand the problem we must check Composer architecture

https://cloud.google.com/composer/docs/concepts/overview

The scheduler runs in GKE using the service account configured when you created the Composer instance

The web UI runs in a tenant project in App Engine using a different service account. The resources of this tenant project are hidden (you don't see the App Engine application, the Cloud SQL instance or the service account in the project resources)

When the web UI parses the DAG file, it tries to access BigQuery using the connection 'bigquery_default'. Checking airflow GCP _get_credentials source code

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/gcp_api_base_hook.py#L74

If you have not configured the connection in the airflow admin, it will use google.auth.default method for connecting to BigQuery using the tenant project service account. This service account doesn't have permissions to access BigQuery, it will get an unauthorised error and will not able to generate the DAG in the UI. Probably if you check in Stackdriver, you will find the BigQuery error.

On the other side, the airflow scheduler uses the service account used in Composer creation, that have the right permissions and it parses the DAG correctly

If you execute the code in your local airflow instance, as the Web UI and the Scheduler use the same service account it works as expected in both cases

The easiest solution is to add to the bigquery_default connection a Keyfile Path o Keyfile JSON to avoid using the default service account in the web UI

If you have any security concern with this solution (service account credentials will be available to anyone with access to Composer) another option is to restructure the code to execute all your code inside a PythonOperator. This PythonOperator will call get_table and then will loop executing the BigQuery commands (using a BigQueryHook instead of a BigQueryOperator). The problem of this solution is that you will have a single task instead of a task per table

这篇关于在气流中发出数据库请求时遇到的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆