从气流中的 BigQueryOperator 获取结果 [英] Fetch results from BigQueryOperator in airflow

查看:34
本文介绍了从气流中的 BigQueryOperator 获取结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用气流从 BigQueryOperator 获取结果,但我找不到办法做到这一点.我尝试在 bq_cursor 成员(在 1.10 中可用)中调用 next() 方法,但它返回 None.这就是我尝试这样做的方式

I am trying to fetch results from BigQueryOperator using airflow but I could not find a way to do it. I tried calling the next() method in the bq_cursor member (available in 1.10) however it returns None. This is how I tried to do it

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

查看bigquery_hook.pybigquery_operator.py 这似乎是获取结果的唯一可用方法.

Taking a look to bigquery_hook.py and bigquery_operator.py it seems to be the only available way to fetch the results.

推荐答案

感谢 @kaxil 和 @Mike 的回答.我发现了问题.BigQueryCursor 中有一种错误(在我看来).作为 run_with_configuration 的一部分,running_job_id 被返回但从未分配给 job_id,后者用于在 next 中提取结果 方法.一种解决方法(不是很优雅,但如果您不想重新实现所有内容,则很好),是基于 running_job_id 在钩子中这样分配 job_id

Thanks to @kaxil and @Mike for their answers. I found the problem. There is a kind of bug (in my mind) in the BigQueryCursor. As part of the run_with_configuration, the running_job_id is being returned but never assigned to job_id which is used to pull the results in the next method. A workaround (not really elegant but good if you do not want to re-implement everything), is assign the job_id based on the running_job_id in the hook like this

big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())

这篇关于从气流中的 BigQueryOperator 获取结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆