如何将asyncio与其他OS线程同步? [英] How can I synchronize asyncio with other OS threads?

查看:96
本文介绍了如何将asyncio与其他OS线程同步?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有一个主线程的程序,在该程序中产生了另一个使用asyncio的线程.是否提供任何工具来同步这两个线程?如果一切都是异步的,我可以使用其同步原语来实现,例如:

I have a program with one main thread where I spawn a second thread that uses asyncio. Are there any tools provided to synchronize these two threads? If everything was asyncio, I could do it with its synchronization primitives, eg:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

但是,这不适用于多个线程.如果我只使用threading.Event,它将阻塞asyncio线程.我发现我可以将等待时间推迟到执行者身上:

However, this does not work with multiple threads. If I just use a threading.Event then it will block the asyncio thread. I figured out I could defer the wait to an executor:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

但是,仅让执行程序线程等待互斥锁似乎是不自然的.这是应该完成的方式吗?还是有其他方法可以异步等待OS线程之间的同步?

However, having an executor thread only to wait for a mutex seems unnatural. Is this the way this is supposed to be done? Or is there any other way to wait for synchronization between OS threads asynchronously?

推荐答案

将异步协程与来自另一个线程的事件同步的一种简单方法是等待taskB中的asyncio.Event,并使用.

A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe.

要能够在两者之间传递值和异常,可以使用期货;但是,随后您发明了run_in_executor的大部分内容.如果taskA的唯一工作是将任务从队列中取出,您最好将单个工作人员池"用作其工作线程.然后,您可以按预期使用run_in_executor:

To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor as intended:

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
    loop = asyncio.get_event_loop()
    # or result = await ..., if taskA has a useful return value
    # This will also propagate exceptions raised by taskA
    await loop.run_in_executor(worker, taskA, lst)
    print('Retrieved:', lst.pop())

语义与您的版本中带有显式队列的语义相同-队列仍然存在,只是在ThreadPoolExecutor内部.

The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor.

这篇关于如何将asyncio与其他OS线程同步?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆