Python asyncio:在辅助线程上运行subprocess_exec [英] Python asyncio: Running subprocess_exec on a worker thread

查看:96
本文介绍了Python asyncio:在辅助线程上运行subprocess_exec的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我正在使用Python asyncio 模块(在Linux上)启动子进程,然后对其进行异步监视.当在主线程上运行时,我的代码工作正常.但是,当我在辅助线程上运行它时,它将挂起,并且永远不会调用 process_exited 回调.

So I'm using the Python asyncio module (on Linux) to launch a child process and then asynchronously monitor it. My code works fine... when run on the main thread. But when I run it on a worker thread, it hangs, and the process_exited callback is never invoked.

我怀疑这实际上可能是某种未记录的缺陷,或者是在工作线程上运行 subprocess_exec 的问题,可能与实现如何处理后台线程中的信号有关.但这也可能只是我搞砸了.

I suspect this may actually be some kind of undocumented defect or issue with running subprocess_exec on a worker thread, likely having to do with how the implementation handles signals in a background thread. But it could also just be me screwing things up.

一个简单的,可复制的示例如下:

A simple, reproducible example is as follows:

class MyProtocol(asyncio.SubprocessProtocol):
    def __init__(self, done_future):
        super().__init__()
        self._done_future = done_future

    def pipe_data_received(self, fd, data):
        print("Received:", len(data))

    def process_exited(self):
        print("PROCESS EXITED!")
        self._done_future.set_result(None)

def run(loop):
    done_future = asyncio.Future(loop = loop)
    transport = None
    try:
        transport, protocol = yield from loop.subprocess_exec(
            lambda : MyProtocol(done_future),
            "ls",
            "-lh",
            stdin = None
        )
        yield from done_future
    finally:
        if transport: transport.close()

    return done_future.result()

def run_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) # bind event loop to current thread

    try:
        return loop.run_until_complete(run(loop))
    finally:
        loop.close()

因此,在这里,我设置了一个 asyncio 事件循环以执行shell命令 ls -lh ,然后在从子进程接收到数据时触发回调.子进程退出时的另一个回调.

So here, I setup an asyncio event loop to execute the shell command ls -lh, and then trigger a callback for when data is received from the subprocess, and another callback for when the subprocess exits.

如果我直接在Python程序的主线程中调用 run_loop(),一切都会很好.但是,如果我说:

If I just call run_loop() directly in the main thread of a Python program, everything goes fine. But if I say:

t = threading.Thread(target = run_loop)
t.start()
t.join()

然后发生的是,成功调用了 pipe_data_received()回调,但是从未调用 process_exited(),并且程序仅挂起.

Then what happens is that the pipe_data_received() callback is invoked successfully, but process_exited() is never invoked, and the program just hangs.

在浏览并查看了 unix_events.py 的实现的 asyncio 源代码之后,我发现可能有必要将事件循环手动附加到全局儿童观察者"对象,如下所示:

After Googling around and looking at the asyncio source code for the implementation of unix_events.py, I discovered it might be necessary to manually attach my event loop to the global "child watcher" object, as follows:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)

显然,儿童观察者是一个(未记录的)对象,负责在内部(或类似的东西)调用 waitpid .但是,当我尝试此操作并在后台线程中运行 run_event_loop()时,出现了错误:

Apparently, the child watcher is an (undocumented) object that is responsible for calling waitpid under the hood (or something like that). But when I tried this, and ran run_event_loop() in a background thread, I got the error:

  File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
    raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread

因此,这里看起来该实现实际上进行了检查以确保只能在主线程上使用信号处理程序,这使我相信在当前实现中,在一个线程上使用 subprocess_exec 实际上,如果不更改Python源代码本身,后台线程是根本不可能的.

So here it looks like the implementation actually does a check to make sure that signal handlers can only be used on the main thread, leading me to believe that in the current implementation, using subprocess_exec on a background thread is in fact, simply impossible without changing the Python source code itself.

我对此是否正确?遗憾的是, asyncio 模块的文档不足,因此我很难对此处的结论充满信心.我可能做错了什么.

Am I correct about this? Sadly, the asyncio module is very under-documented, so it's hard for me to be confident about my conclusion here. I may simply be doing something wrong.

推荐答案

只要在主线程中运行asyncio循环并使其子观察者实例化,就可以在工作线程中处理子进程:

Handling subprocesses in a worker thread is fine as long as an asyncio loop is running in the main thread with its child watcher instanciated:

asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)

请参见这篇文章 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆