Python-如何使用-大查询异步任务 [英] Python - How to - Big Query asynchronous tasks

查看:52
本文介绍了Python-如何使用-大查询异步任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这可能是一个虚拟的问题,但我似乎无法异步运行python google-clood-bigquery.

This may be a dummy question but I cannot seem to be able to run python google-clood-bigquery asynchronously.

我的目标是同时运行多个查询,并等待所有查询在asyncio.wait()查询收集器中完成.我正在使用asyncio.create_tast()来启动查询. 问题在于,每个查询在开始之前都等待先例的完成.

My goal is to run multiple queries concurrently and wait for all to finish in an asyncio.wait() query gatherer. I'm using asyncio.create_tast() to launch the queries. The problem is that each query waits for the precedent one to complete before starting.

这是我的查询功能(非常简单):

Here is my query function (quite simple):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()

由于我无法等待job.result(),我还应该等待其他东西吗?

Since I cannot await job.result() should I await something else?

推荐答案

如果您在coroutine内部工作,并且想要在不阻止event_loop的情况下运行其他查询,则可以使用run_in_executor函数,该函数基本上在后台线程中运行查询,而不会阻塞循环. 这里是如何使用它的一个很好的例子.

If you are working inside of a coroutine and want to run different queries without blocking the event_loop then you can use the run_in_executor function which basically runs your queries in background threads without blocking the loop. Here's a good example of how to use that.

请确保这正是您所需要的;为在Python API中运行查询而创建的作业已经是异步的,并且仅在您调用job.result()时才会阻塞.这意味着除非您位于协程内部,否则无需使用asyncio.

Make sure though that that's exactly what you need; jobs created to run queries in the Python API are already asynchronous and they only block when you call job.result(). This means that you don't need to use asyncio unless you are inside of a coroutine.

下面是一个在完成工作后立即检索结果的快速示例:

Here's a quick possible example of retrieving results as soon as the jobs are finished:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))

results将是:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]

这篇关于Python-如何使用-大查询异步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆