从气流中的BigQueryOperator获取结果 [英] Fetch results from BigQueryOperator in airflow

查看:168
本文介绍了从气流中的BigQueryOperator获取结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用气流从 BigQueryOperator 获取结果,但是我找不到解决方法。我尝试在 bq_cursor 成员(在1.10中可用)中调用 next()方法,但是它返回。这就是我试图做到的方式

I am trying to fetch results from BigQueryOperator using airflow but I could not find a way to do it. I tried calling the next() method in the bq_cursor member (available in 1.10) however it returns None. This is how I tried to do it

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

看看 bigquery_hook.py bigquery_operator.py 似乎是获取结果的唯一可用方法。

Taking a look to bigquery_hook.py and bigquery_operator.py it seems to be the only available way to fetch the results.

推荐答案

感谢@kaxil和@Mike的回答。我发现了问题。 BigQueryCursor 中有一种错误(在我看来)。作为 run_with_configuration 的一部分,将返回 running_job_id ,但从未分配给 job_id 用于在 next 方法中提取结果。一种变通方法(不是很优雅,但是如果您不想重新实现所有功能,则很好),是根据 running_job_id 分配 job_id c $ c>这样的钩子

Thanks to @kaxil and @Mike for their answers. I found the problem. There is a kind of bug (in my mind) in the BigQueryCursor. As part of the run_with_configuration, the running_job_id is being returned but never assigned to job_id which is used to pull the results in the next method. A workaround (not really elegant but good if you do not want to re-implement everything), is assign the job_id based on the running_job_id in the hook like this

big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())

一个问题已解决,该错误在 run_with_configuration 分配正确的job_id的过程结束时,解决方法后的行可以删除

One the problem get fixed on the run_with_configuration assigning the correct job_id at the end of the process, the row after workaround can be removed

这篇关于从气流中的BigQueryOperator获取结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆