从气流中的BigQueryOperator获取结果 [英] Fetch results from BigQueryOperator in airflow
问题描述
我正在尝试使用气流从 BigQueryOperator
获取结果,但是我找不到解决方法。我尝试在 bq_cursor
成员(在1.10中可用)中调用 next()
方法,但是它返回无
。这就是我试图做到的方式
I am trying to fetch results from BigQueryOperator
using airflow but I could not find a way to do it. I tried calling the next()
method in the bq_cursor
member (available in 1.10) however it returns None
. This is how I tried to do it
import datetime
import logging
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time()
)
def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql='select count(*) from mydataset.mytable'
)
big_query_count.execute(context=kwargs)
logging.info(big_query_count)
logging.info(big_query_count.__dict__)
logging.info(big_query_count.bq_cursor.next())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'project_id': 'myproject'
}
with models.DAG(
'bigquery_results_execution',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
myoperator = python_operator.PythonOperator(
task_id='threshold_operator',
provide_context=True,
python_callable=MyChequer
)
# Define DAG
myoperator
看看 bigquery_hook.py 和 bigquery_operator.py 似乎是获取结果的唯一可用方法。
Taking a look to bigquery_hook.py and bigquery_operator.py it seems to be the only available way to fetch the results.
推荐答案
感谢@kaxil和@Mike的回答。我发现了问题。 BigQueryCursor
中有一种错误(在我看来)。作为 run_with_configuration
的一部分,将返回 running_job_id
,但从未分配给 job_id
用于在 next
方法中提取结果。一种变通方法(不是很优雅,但是如果您不想重新实现所有功能,则很好),是根据 running_job_id $>分配
job_id
c $ c>这样的钩子
Thanks to @kaxil and @Mike for their answers. I found the problem. There is a kind of bug (in my mind) in the BigQueryCursor
. As part of the run_with_configuration
, the running_job_id
is being returned but never assigned to job_id
which is used to pull the results in the next
method. A workaround (not really elegant but good if you do not want to re-implement everything), is assign the job_id
based on the running_job_id
in the hook like this
big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())
一个问题已解决,该错误在 run_with_configuration
分配正确的job_id的过程结束时,解决方法后的行可以删除
One the problem get fixed on the run_with_configuration
assigning the correct job_id at the end of the process, the row after workaround can be removed
这篇关于从气流中的BigQueryOperator获取结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!