有没有办法在多个线程中使用 asyncio.Queue? [英] Is there a way to use asyncio.Queue in multiple threads?

查看:48
本文介绍了有没有办法在多个线程中使用 asyncio.Queue?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有以下代码:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

这段代码的问题是 async 协程内部的循环永远不会完成第一次迭代,而 queue 大小正在增加.

The problem with this code is that the loop inside async coroutine is never finishing the first iteration, while queue size is increasing.

为什么会出现这种情况,我该如何解决?

我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备进行通信,而我还没有找到使用 asyncio 做到这一点的方法.

I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio.

推荐答案

asyncio.Queue 不是线程安全的,因此您不能直接从多个线程使用它.相反,您可以使用 janus,这是第三方提供线程感知asyncio 队列的库:

asyncio.Queue is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus, which is a third-party library that provides a thread-aware asyncio queue:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

还有 aioprocessing(完全公开:我写的),它也提供进程安全(作为副作用,线程安全)队列,但如果您不尝试使用 multiprocessing,那这太过分了.

There is also aioprocessing (full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing.

编辑

正如其他答案中指出的那样,对于简单的用例,您可以使用 loop.call_soon_threadsafe 也添加到队列中.

As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe to add to the queue, as well.

这篇关于有没有办法在多个线程中使用 asyncio.Queue?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆