异步队列消费者协程 [英] asyncio queue consumer coroutine

查看:96
本文介绍了异步队列消费者协程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 asyncio.Protocol 子类,它从服务器接收数据。
我将此数据(每行,因为数据是文本)存储在 asyncio.Queue 中。

I have a asyncio.Protocol subclass receiving data from a server. I am storing this data (each line, because the data is text) in a asyncio.Queue.

import asyncio

q = asyncio.Queue()

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        for message in data.decode().splitlines():
            yield q.put(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
                              '127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

我想让另一个协程负责消耗队列中的数据,

I want to have another coroutine responsible for consuming the data in the queue and processing it.


  • 这应该是 asyncio.Task 吗?

  • 如果由于几秒钟没有收到数据而导致队列变空怎么办?我如何确保我的使用者不会停止( run_until_complete )?

  • 有没有一种比使用全局变量更干净的方法我的队列?

  • Should this be a asyncio.Task?
  • What if the queue becomes empty because for a few seconds no data is received? How can I make sure my consumer doesn't stop (run_until_complete)?
  • Is there a cleaner way than using a global variable for my queue?

推荐答案


这应该是asyncio.Task ?

Should this be a asyncio.Task?

是的,使用 asyncio.ensure_future loop.create_task


如果队列由于几秒钟而变空怎么办

What if the queue becomes empty because for a few seconds no data is received?

只需使用 queue.get 等到某个商品可用:

Simply use queue.get to wait until an item is available:

async def consume(queue):
    while True:
        item = await queue.get()
        print(item)




是否有比在队列中使用全局变量更清洁的方法?

Is there a cleaner way than using a global variable for my queue?

是的,只需将其作为参数传递给消费者协程,然后流协议:

Yes, simply pass it as argument to the consumer coroutine and stream protocol:

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop, queue):
        self.loop = loop
        self.queue = queue

    def data_received(self, data):
        for message in data.decode().splitlines():
            self.queue.put_nowait(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()




如何确保我的消费者不会停止(run_until_complete)?

How can I make sure my consumer doesn't stop (run_until_complete)?

一旦关闭连接,请使用 queue.join

Once the connection is closed, use queue.join to wait until the queue is empty.

完整示例:

loop = asyncio.get_event_loop()
queue = asyncio.Queue()
# Connection coroutine
factory = lambda: StreamProtocol(loop, queue)
connection = loop.create_connection(factory, '127.0.0.1', '42')
# Consumer task
consumer = asyncio.ensure_future(consume(queue))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the loop
loop.close()

或者,您也可以使用

async def tcp_client(host, port, loop=None):
    reader, writer = await asyncio.open_connection(host, port, loop=loop)
    async for line in reader:
        print(line.rstrip())
    writer.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop))
loop.close()

这篇关于异步队列消费者协程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆