同时观察子进程的 stdout 和 stderr [英] Watch stdout and stderr of a subprocess simultaneously

查看:35
本文介绍了同时观察子进程的 stdout 和 stderr的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何同时观察长时间运行的子进程的标准输出和标准错误,并在子进程生成后立即处理每一行?

How can I watch standard output and standard error of a long-running subprocess simultaneously, processing each line as soon as it is generated by the subprocess?

我不介意使用 Python3.6 的异步工具在两个流中的每一个上创建我期望的非阻塞异步循环,但这似乎并没有解决问题.下面的代码:

I don't mind using Python3.6's async tools to make what I expect to be non-blocking async loops over each of the two streams, but that doesn't seem to solve the problem. The below code:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

输出:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1

虽然我希望它输出如下内容:

While I expect it to output something like:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2

推荐答案

要实现这一点,您需要一个函数,该函数将采用两个异步序列并将它们合并,从一个或其他,当它们可用时.有了这样的函数,run 可能看起来像这样:

To accomplish this, you need a function that will take two async sequences and merge them, producing the results from either one or the other, as they become available. With such a function in stock, run could look like this:

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in merge(p.stdout, p.stderr):
        print(datetime.now(), f.decode().strip())

merge 这样的函数(还)不存在于标准库中,但是 aiostream 外部库 提供了一个.您还可以使用异步生成器和 asyncio.wait() 编写自己的代码:

A function like merge does not (yet) exist in the standard library, but the aiostream external library provides one. You can also write your own using an async generator and asyncio.wait():

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

上面的 run 仍然会在一个细节上与您想要的输出有所不同:它不会区分输出和错误行.但这可以通过用指示器装饰线条来轻松实现:

The above run will still differ from your desired output in one detail: it will not distinguish between output and error lines. But this can be easily accomplished by decorating the lines with an indicator:

async def decorate_with(it, prefix):
    async for item in it:
        yield prefix, item

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for is_out, line in merge(decorate_with(p.stdout, True),
                                    decorate_with(p.stderr, False)):
        if is_out:
            print(datetime.now(), line.decode().strip())
        else:
            print(datetime.now(), "E:", line.decode().strip())

这篇关于同时观察子进程的 stdout 和 stderr的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆