与bigquery操作员建立气流 [英] setting up airflow with bigquery operator
问题描述
我正在试验数据管道的气流。不幸的是,到目前为止,我无法与bigquery运营商合作。我已经寻找了一个解决方案,尽我所能,但我仍然卡住..我使用顺序执行程序在本地运行。
这是我的代码:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime,timedelta
default_args = {
'owner':'airflow',
'depends_on_past':False,
'start_date':datetime(2015,6,1),
'email':['example@gmail.com'],
'email_on_failure':假,
'email_on_retry':假,
'重试':1,
'retry_delay':timedelta(minutes = 5),
#'queue':'bash_queue',
#'pool':'backfill',
''priority_weight':10,
#'end_date':datetime(2016,1,1),
}
dag = DAG(dag_id ='bigQueryPipeline',default_args = default_args,schedule_interval = timedelta(1 ))
t1 = BigQueryOperator(
task_id ='bigquery_test',
bql ='SELECT COUNT(userId)FROM [events:EVENTS_20160501]',
destination_dataset_table = False,
bigquery_conn_id ='bigquery_default',
delegate_to = False,
udf_config = False,
dag = dag,
)`
错误信息:
[2016-08-27 00:13:14,665] { models.py:1327} ERROR - 'project'
Traceback(最近一次调用的最后一个):
在< module>文件中的/Users/jean.rodrigue/anaconda/bin/airflow第15行;
args.func(args)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py,行352,在test
ti.run(force = True,ignore_dependencies = True,test_mode = True)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db。 py,第53行,封装
result = func(* args,** kwargs)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/ models.py,第1245行,运行
result = task_copy.execute(context = context)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow /contrib/operators/bigquery_operator.py,第57行,执行
conn = hook.get_conn()
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages /airflow/contrib/hooks/bigquery_hook.py,第54行,在get_conn
project = connection_extras ['project']
花了我一会儿才终于找到它,因为它不是docume非常清楚。在气流用户界面中,转至管理 - >连接。该连接ID是由参数bigquery_connection_id引用的内容。您必须在extras字段中添加一个定义ak,v对project的json对象:
您还必须为service_account和key_path,如果您没有明确授权您正在运行Airflow的方框中的帐户。 (gcloud auth)
I am experimenting with airflow for data pipelines. I unfortunately cannot get it to work with the bigquery operator so far. I have searched for a solution to the best of my ability but I am still stuck.. I am using the sequential executor running locally.
Here is my code:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['example@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(dag_id='bigQueryPipeline', default_args=default_args, schedule_interval=timedelta(1))
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)`
The error message:
[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
args.func(args)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
ti.run(force=True, ignore_dependencies=True, test_mode=True)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
result = task_copy.execute(context=context)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
conn = hook.get_conn()
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
project = connection_extras['project']
Took me a while to finally find it as it's not documented very clearly. In the airflow UI, go to Admin -> Connection. That connection id is what is being referenced by the parameters bigquery_connection_id. You must add in the "extras" field a json object that defines a k,v pair of "project" : ""
You must also add keys for "service_account" and "key_path" if you have not explicitly authorized an account on the box you're running Airflow. (gcloud auth)
这篇关于与bigquery操作员建立气流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!