使用主线程输入将运行异步a协程的对象与Python队列链接 [英] Python queue linking object running asyncio coroutines with main thread input

查看:159
本文介绍了使用主线程输入将运行异步a协程的对象与Python队列链接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我运行了一个脚本,其中主线程从stdin接收输入,然后使用队列将其传递给子线程.在子线程中,我正在使用asyncio协程在套接字上启动侦听器并等待连接.建立连接后,我现在可以从主线程通过侦听器发送数据了.

I have a script running where the main thread takes input from stdin and then passes it to a child thread using a queue. In the child thread I'm using asyncio coroutines to spin up a listener on a socket and wait for connections. Once a connection is made I can now send data through the listener from the main thread.

这一切似乎都可以很好地工作,但是由于asyncio.BaseEventLoop不是线程安全的,我会遇到问题吗?

It all seems to work well enough, but since asyncio.BaseEventLoop is not thread safe am I going to run into problems?

这是我的尝试,用于解决将asyncio之类的阻塞库(如python的cmd模块)使用的问题.

This is my attempt to solve the problem of using a blocking library like python's cmd module with asyncio.

我的代码在下面.

import sys
import asyncio
from time import sleep
from threading import Thread
from queue import Queue

stdin_q = Queue()

clients = {} # task -> (reader, writer)

def client_connected_handler(client_reader, client_writer):
    # Start a new asyncio.Task to handle this specific client connection
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_reader, client_writer)

    def client_done(task):
        # When the tasks that handles the specific client connection is done
        del clients[task]

    # Add the client_done callback to be run when the future becomes done
    task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    # Handle the requests for a specific client with a line oriented protocol
    while True:

        cmd = yield from get_input()
        client_writer.write(cmd.encode())

        data = yield from client_reader.read(1024)

        print(data.decode(),end="",flush=True)

@asyncio.coroutine
def get_input():
  while True:
    try:
      return stdin_q.get()
    except:
      pass



class Control:

    def start(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.loop = asyncio.get_event_loop()

        server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
        self.loop.run_forever()
        self.stop()

    def stop(self):    
        self.loop.stop()
        self.loop.close()

def fire_control():
    con = Control()
    con.start()

if __name__ == "__main__":

    stdin_q.put("\n")
    t = Thread(target=fire_control)
    t.start()
    sleep(2)
    _cmd = ""
    while _cmd.lower() != "exit":
        _cmd = input("")
        if _cmd == "":
          _cmd = "\r\n"

        stdin_q.put(_cmd)      

推荐答案

这行不通,因为对stdin_q.get()的调用将阻止事件循环.这意味着,如果您的服务器有多个客户端,则首先碰到stdin_q.get()的任何一个客户端都会完全阻止所有客户端,直到将数据发送到队列中为止.解决此问题的最简单方法是使用 BaseEvent.loop.run_in_executor 在后台ThreadPoolExecutor中运行stdin_q.get,这样您就可以等待它而不会阻塞事件循环:

This isn't going to work quite right, because the call to stdin_q.get() is going to block your event loop. This means that if your server has multiple clients, all of them will be completely blocked by whichever one happens to get to stdin_q.get() first, until you send data into the queue. The simplest way to get around this is use BaseEvent.loop.run_in_executor to run the stdin_q.get in a background ThreadPoolExecutor, which allows you to wait for it without blocking the event loop:

@asyncio.coroutine
def get_input():
    loop = asyncio.get_event_loop()
    return (yield from loop.run_in_executor(None, stdin_q.get))  # None == use default executor.

修改(16/1/27):

有一个名为 janus 的库,该库提供了异步友好的线程-安全队列的实现.

There is a library called janus, which provides an asyncio-friendly, thread-safe queue implementation.

使用该库,您的代码将如下所示(我省略了未更改的部分):

Using that library, your code would look like this (I left out unchanged parts):

...
import janus

loop = asyncio.new_event_loop()
stdin_q = janus.Queue(loop=loop)
...

@asyncio.coroutine
def get_input():
  loop = asyncio.get_event_loop()
  return (yield from stdin_q.async_q.get())

class Control:

    def start(self):
        asyncio.set_event_loop(loop)
        self.loop = asyncio.get_event_loop()

        server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
        self.loop.run_forever()
        self.stop()

    def stop(self):    
        self.loop.stop()
        self.loop.close()

...

if __name__ == "__main__":

    stdin_q.sync_q.put("\n")
    t = Thread(target=runner)
    t.start()
    sleep(2)
    _cmd = ""
    while _cmd.lower() != "exit":
        _cmd = input("")
        if _cmd == "":
          _cmd = "\r\n"

        stdin_q.sync_q.put(_cmd)

这篇关于使用主线程输入将运行异步a协程的对象与Python队列链接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆