Python-如何使用-大查询异步任务 [英] Python - How to - Big Query asynchronous tasks
问题描述
这可能是一个虚拟的问题,但我似乎无法异步运行python google-clood-bigquery.
This may be a dummy question but I cannot seem to be able to run python google-clood-bigquery asynchronously.
我的目标是同时运行多个查询,并等待所有查询在asyncio.wait()
查询收集器中完成.我正在使用asyncio.create_tast()
来启动查询.
问题在于,每个查询在开始之前都等待先例的完成.
My goal is to run multiple queries concurrently and wait for all to finish in an asyncio.wait()
query gatherer. I'm using asyncio.create_tast()
to launch the queries.
The problem is that each query waits for the precedent one to complete before starting.
这是我的查询功能(非常简单):
Here is my query function (quite simple):
async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
job = self.api.query(query, **kwargs)
return job.result()
由于我无法等待job.result()
,我还应该等待其他东西吗?
Since I cannot await job.result()
should I await something else?
推荐答案
如果您在coroutine
内部工作,并且想要在不阻止event_loop
的情况下运行其他查询,则可以使用run_in_executor
函数,该函数基本上在后台线程中运行查询,而不会阻塞循环. 这里是如何使用它的一个很好的例子.
If you are working inside of a coroutine
and want to run different queries without blocking the event_loop
then you can use the run_in_executor
function which basically runs your queries in background threads without blocking the loop. Here's a good example of how to use that.
请确保这正是您所需要的;为在Python API中运行查询而创建的作业已经是异步的,并且仅在您调用job.result()
时才会阻塞.这意味着除非您位于协程内部,否则无需使用asyncio
.
Make sure though that that's exactly what you need; jobs created to run queries in the Python API are already asynchronous and they only block when you call job.result()
. This means that you don't need to use asyncio
unless you are inside of a coroutine.
下面是一个在完成工作后立即检索结果的快速示例:
Here's a quick possible example of retrieving results as soon as the jobs are finished:
from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq
client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'
threads = []
results = []
executor = ThreadPoolExecutor(5)
for job in [client.query(query1), client.query(query2)]:
threads.append(executor.submit(job.result))
# Here you can run any code you like. The interpreter is free
for future in as_completed(threads):
results.append(list(future.result()))
results
将是:
[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]
这篇关于Python-如何使用-大查询异步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!