Python 3.4 Comms流委托-非阻塞接收和发送-数据从asyncio输出 [英] Python 3.4 Comms Stream Delegate - Non-blocking recv and send - data out from asyncio

查看:119
本文介绍了Python 3.4 Comms流委托-非阻塞接收和发送-数据从asyncio输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要在RPi上集成一个客户端服务器应用程序.它有一个主线程,该主线程创建了一个与iOS设备对话的通讯线程. 主线程创建一个异步事件循环以及sendQ和recvQ并将它们作为args传递到comms线程中的commsDelegate main方法.

I'm putting together a client server app on RPi. It has a main thread which creates a comms thread to talk to an iOS device. The main thread creates an asyncio event loop and a sendQ and a recvQ and passes them as args to the commsDelegate main method in the comms thread.

我遇到的麻烦是,iOS设备连接时,一旦数据可用,它需要从该Python应用程序接收未经请求的数据,并且需要能够将数据发送到Python应用程序.因此,发送和接收都必须是非阻塞的.

The trouble I'm having is when iOS device connects, it needs to receive unsolicited data from this Python app as soon as the data becomes available and it needs to be able to send data up to the Python app. So send and receive need to be non-blocking.

那里有很棒的回声服务器教程.但是就服务器而言,对数据做些有用的事情就很少.

There are great echo server tutorials out there. But little in terms of the server doing something useful with the data.

有人可以协助我让asyncio读取我的发送队列并在主线程排队后立即转发数据吗?我的工作很棒.

Can anyone assist me in getting asyncio to read my send queue and forward data as soon as the main thread has queued it? I have receive working great.

主线程创建一个循环并启动comms线程:

  commsLoop = asyncio.new_event_loop()
  commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
  commsMainThread.start()

然后,CommsDelegate模块中的asyncio应该作为loop.run_forever()服务器任务从套接字流读取和写入,并使用队列发送接收消息,并返回主线程,从而运行循环.

Then asyncio in module CommsDelegate should run the loop as loop.run_forever() server task reading and writing from a socket stream and sending receiving messages using queues back up to the main thread.

到目前为止,这是我的代码.我发现,如果我为协议生成器创建工厂,则可以将队列名称传递给它,这样消息的接收现在就很好了.当它们从客户端到达时,它们被排队_nowait并且主线程很好地接收了它们.

Here's my code so far. I found that if I create a factory for the protocol generator, I can pass it the queue names so the receipt of messages is all good now. When they arrive from the client they are queued _nowait and the main thread receives them just fine.

我只需要asyncio来处理来自Main线程的出站消息队列,当它们到达sendQ时,便可以将它们发送到连接的客户端.

I just need asyncio to handle the queue of outbound messages from the Main thread as they arrive on sendQ, so it can send them on to the connected client.

#!/usr/bin/env python3.6
import asyncio

class ServerProtocol(asyncio.Protocol):

  def __init__(self, loop, recvQ, sendQ):
    self.loop = loop
    self.recvQ = recvQ
    self.sendQ = sendQ

  def connection_made(self, transport):
    peername = transport.get_extra_info('peername')
    print('Connection from {}'.format(peername))
    self.transport = transport

  def data_received(self, data):
    message = data.decode()
    print('Data received: {!r}'.format(message))
    self.recvQ.put_nowait(message.rstrip())

  # Needs work... I think the queue.get_nowait should be a co-ro maybe? 
  def unknownAtTheMo():
    dataToSend = sendQ.get_nowait()
    print('Send: {!r}'.format(message))
    self.transport.write(dataToSend)

  # Needs work to close on request from client or server or exc...
  def handleCloseSocket(self):
    print('Close the client socket')
    self.transport.close()

# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
  print("In consume coro")
  while True:
    outboundData = await self.sendQ.get()
    print("Consumed", outboundData)
    self.transport.write(outboundData.encode('ascii'))

def commsDelegate(recvQ, sendQ, loop, port):
  asyncio.set_event_loop(loop)

  # Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
  factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
  # Each client connection will create a new protocol instance
  connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))

  # Outgoing message queue handler
  consumer = asyncio.ensure_future(consume(sendQ))

  # Set up connection
  loop.run_until_complete(connection)

  # Wait until the connection is closed
  loop.run_forever()

  # Wait until the queue is empty
  loop.run_until_complete(queue.join())

  # Cancel the consumer
  consumer.cancel()

  # Let the consumer terminate
  loop.run_until_complete(consumer)

  # Close the connection
  connection.close()

  # Close the loop
  loop.close()

我将所有数据消息都以json格式发送,而CommsDelegate执行编码和解码,然后将其中继.

I send all data messages as json and CommsDelegate performs encode and decode then relays them asis.

更新:asyncio线程似乎对于传入流量运行良好.服务器接收json并通过队列中继-非阻塞.

Update: asyncio thread seems to be working well for incoming traffic. Server receives json and relays it via a queue - non-blocking.

一旦发送成功,我将在线程上拥有一个可重用的黑盒服务器.

Once the send is working, I'll have a reusable blackbox server on a thread.

推荐答案

我可以看到您的方法有两个问题.首先,您的所有客户都使用相同的recvsend队列,因此consume协程无法知道要回复谁.

I can see two problems with your approach. First, all your clients are using the same recv and send queues, so there is no way the consume coroutine can know who to reply to.

第二个问题与您将队列用作同步世界和异步世界之间的桥梁有关.参见代码的这一部分:

The second issue has to do with your use of queues as a bridge between the synchronous and the asynchronous worlds. See this part of your code:

await self.sendQ.get()

如果sendQ是常规队列(来自queue模块),则此行将失败,因为sendQ不是协程.另一方面,如果sendQasyncio.Queue,则主线程将不能使用sendQ.put,因为它是协程.可以使用put_nowait,但是在asyncio中不能保证线程安全.相反,您必须使用 loop.call_soon_threadsafe :

If sendQ is a regular queue (from the queue module), this line will fail because sendQ is not a coroutine. On the other hand, if sendQ is an asyncio.Queue, the main thread won't be able to use sendQ.put because it is a coroutine. It would be possible to use put_nowait, but thread-safety is not guaranteed in asyncio. Instead, you'd have to use loop.call_soon_threadsafe:

loop.call_soon_threadsafe(sendQ.put_nowait, message)

通常,请记住asyncio设计为可以作为主应用程序运行.它应该在主线程中运行,并通过ThreadPoolExecutor与同步代码进行通信(请参见

In general, remember that asyncio is designed to run as the main application. It's supposed to run in the main thread, and communicate with synchronous code through a ThreadPoolExecutor (see loop.run_in_executor).

asyncio文档中的有关多线程的更多信息.您可能还想看看 asyncio流API ,提供了一个更好的接口来使用TCP.

More information about multithreading in the asyncio documentation. You might also want to have a look at the asyncio stream API that provides a much nicer interface to work with TCP.

这篇关于Python 3.4 Comms流委托-非阻塞接收和发送-数据从asyncio输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆