与bigquery操作员建立气流 [英] setting up airflow with bigquery operator

查看:165
本文介绍了与bigquery操作员建立气流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在试验数据管道的气流。不幸的是,到目前为止,我无法与bigquery运营商合作。我已经寻找了一个解决方案,尽我所能,但我仍然卡住..我使用顺序执行程序在本地运行。



这是我的代码:

  from airflow import DAG 
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime,timedelta

default_args = {
'owner':'airflow',
'depends_on_past':False,
'start_date':datetime(2015,6,1),
'email':['example@gmail.com'],
'email_on_failure':假,
'email_on_retry':假,
'重试':1,
'retry_delay':timedelta(minutes = 5),
#'queue':'bash_queue',
#'pool':'backfill',
''priority_weight':10,
#'end_date':datetime(2016,1,1),
}

dag = DAG(dag_id ='bigQueryPipeline',default_args = default_args,schedule_interval = timedelta(1 ))

t1 = BigQueryOperator(
task_id ='bigquery_test',
bql ='SELECT COUNT(userId)FROM [events:EVENTS_20160501]',
destination_dataset_table = False,
bigquery_conn_id ='bigquery_default',
delegate_to = False,
udf_config = False,
dag = dag,
)`

错误信息:

  [2016-08-27 00:13:14,665] { models.py:1327} ERROR  - 'project'
Traceback(最近一次调用的最后一个):
在< module>文件中的/Users/jean.rodrigue/anaconda/bin/airflow第15行;
args.func(args)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py,行352,在test
ti.run(force = True,ignore_dependencies = True,test_mode = True)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db。 py,第53行,封装
result = func(* args,** kwargs)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/ models.py,第1245行,运行
result = task_copy.execute(context = context)
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow /contrib/operators/bigquery_operator.py,第57行,执行
conn = hook.get_conn()
文件/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages /airflow/contrib/hooks/bigquery_hook.py,第54行,在get_conn
project = connection_extras ['project']


解决方案

花了我一会儿才终于找到它,因为它不是docume非常清楚。在气流用户界面中,转至管理 - >连接。该连接ID是由参数bigquery_connection_id引用的内容。您必须在extras字段中添加一个定义ak,v对project的json对象:



您还必须为service_account和key_path,如果您没有明确授权您正在运行Airflow的方框中的帐户。 (gcloud auth)

I am experimenting with airflow for data pipelines. I unfortunately cannot get it to work with the bigquery operator so far. I have searched for a solution to the best of my ability but I am still stuck.. I am using the sequential executor running locally.

Here is my code:

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['example@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(dag_id='bigQueryPipeline', default_args=default_args, schedule_interval=timedelta(1))

t1 = BigQueryOperator(
 task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)`

The error message:

[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
  File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
    args.func(args)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
    ti.run(force=True, ignore_dependencies=True, test_mode=True)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
    conn = hook.get_conn()
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
    project = connection_extras['project']

解决方案

Took me a while to finally find it as it's not documented very clearly. In the airflow UI, go to Admin -> Connection. That connection id is what is being referenced by the parameters bigquery_connection_id. You must add in the "extras" field a json object that defines a k,v pair of "project" : ""

You must also add keys for "service_account" and "key_path" if you have not explicitly authorized an account on the box you're running Airflow. (gcloud auth)

这篇关于与bigquery操作员建立气流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆