如何在 Rust 中不阻塞地读取子进程的输出? [英] How do I read the output of a child process without blocking in Rust?

查看:15
本文介绍了如何在 Rust 中不阻塞地读取子进程的输出?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用 Rust 制作一个需要与子进程通信的小型 ncurses 应用程序.我已经有一个用 Common Lisp 编写的原型.我正在尝试重写它,因为 CL 为这样一个小工具使用了大量内存.

我在弄清楚如何与子流程进行交互时遇到了一些麻烦.

我目前正在做的大致如下:

  1. 创建流程:

    let mut program = match Command::new(command).args(参数).stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()).spawn(){好的(孩子)=>孩子,错误(_)=>{println!("无法运行程序'{}'.", command);返回;}};

  2. 将它传递给一个无限循环(直到用户退出),它读取和处理输入并像这样监听输出(并将其写入屏幕):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {匹配程序.stdout {一些(ref mut out)=>{让 mut buf_string = String::new();match out.read_to_string(&mut buf_string) {好的(_) =>output_viewer.append_string(buf_string),错误(_)=>返回,};}无 =>返回,};}

read_to_string 的调用会阻塞程序,直到进程退出.从我所看到的 read_to_endread 似乎也阻塞了.如果我尝试运行诸如 ls 之类的东西,它会立即退出,它可以工作,但它不会像 pythonsbcl 这样的东西退出一旦我手动终止子进程,就会继续.

基于这个答案,我将代码更改为使用BufReader:

 fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {匹配 program.stdout.as_mut() {一些(出)=>{让 buf_reader = BufReader::new(out);对于 buf_reader.lines() 中的行 {匹配线{好的(l) =>{output_viewer.append_string(l);}错误(_)=>返回,};}}无 =>返回,}}

但是,问题依旧.它将读取所有可用的行,然后阻塞.由于该工具应该适用于任何程序,因此在尝试读取之前无法猜测输出何时结束.似乎也没有办法为 BufReader 设置超时.

解决方案

默认情况下,流是 阻塞.TCP/IP 流、文件系统流、管道流,它们都是阻塞的.当你告诉一个流给你一大块字节时,它会停止并等待它有给定的字节数或直到发生其他事情(中断,流结束,错误).

操作系统急于将数据返回给读取进程,因此如果您只想等待下一行并在它进来时立即处理它,那么Shepmaster在无法通过管道传入或传出衍生的子进程不止一次(也在他的回答中)有效.
虽然理论上它不必工作,因为操作系统允许 BufReaderread 中等待更多数据,但在实践中操作系统更喜欢早期的短读"等待.

这种简单的基于 BufReader 的方法在您需要处理多个流时变得更加危险(例如子进程的 stdoutstderr进程)或多个进程.例如,基于 BufReader 的方法可能会死锁,当一个子进程等待你排空它的 stderr 管道而你的进程被阻塞等待它是空的 stdout.

同样,当你不希望你的程序无限期地等待子进程时,你不能使用 BufReader.也许您想在孩子仍在工作并且不给您任何输出时显示进度条或计时器.

如果您的操作系统碰巧不急于将数据返回给进程(更喜欢完整读取"而不是短读取"),则不能使用基于 BufReader 的方法,因为在这种情况下,子进程打印的最后几行可能会在一个灰色区域结束:操作系统得到了它们,但它们不足以填充 BufReader 的缓冲区.>

BufReader 仅限于 Read 接口允许它对流执行的操作,它的阻塞程度不亚于底层流.为了提高效率,它将阅读以块为单位的输入,告诉操作系统尽可能多地填充其可用缓冲区.

您可能想知道为什么这里分块读取数据如此重要,为什么 BufReader 不能逐字节读取数据.问题是要从流中读取数据,我们需要操作系统的帮助.另一方面,我们不是操作系统,我们与它隔离工作,以免在我们的过程出现问题时弄乱它.因此,为了调用操作系统,需要转换到内核模式".这也可能导致上下文切换".这就是为什么调用操作系统来读取每个字节是昂贵的.我们希望尽可能少的操作系统调用,因此我们可以批量获取流数据.

要在不阻塞的情况下等待流,您需要一个非阻塞流.MIO 承诺为管道提供所需的非阻塞流支持,很可能是 PipeReader,但我目前还没有检查过.

流的非阻塞特性应该能够以块的形式读取数据,而不管操作系统是否更喜欢短读".或不.因为非阻塞流永远不会阻塞.如果流中没有数据,它只会告诉您.

在没有非阻塞流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,从而不会阻塞您的主线程.您可能还想逐字节读取流,以便在操作系统不喜欢短读"的情况下立即对行分隔符做出反应.这是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.

附言下面是一个允许通过共享字节向量监视程序标准输出的函数示例:

使用 std::io::Read;使用 std::process::{Command, Stdio};使用 std::sync::{Arc, Mutex};使用 std::thread;///管道流是阻塞的,我们需要单独的线程来监视它们而不阻塞主线程.fn child_stream_to_vec<R>(mut流:R)->Arc>在哪里R:读取 + 发送 + '静态,{let out = Arc::new(Mutex::new(Vec::new()));让 vec = out.clone();thread::Builder::new().name(child_stream_to_vec".into()).spawn(移动||循环{让 mut buf = [0];匹配流读取(&mut buf){错误(错误)=>{println!("{}] 从流中读取错误:{}", line!(), err);休息;}好的(得到)=>{如果得到 == 0 {休息;} 否则如果得到 == 1 {vec.lock().expect("!lock").push(buf[0])} 别的 {println!("{}] 意外的字节数:{}", line!(), got);休息;}}}}).expect("!thread");出去}fn 主(){let mut cat = Command::new("cat").stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()).spawn().expect("!cat");let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));让 err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));让 mut stdin = 匹配 cat.stdin.take() {一些(标准输入)=>标准输入,无 =>恐慌!(!标准输入"),};}

通过几个助手,我使用它来控制 SSH 会话:

try_s!(stdin.write_all (b"echo hello world
"));试一试!(wait_forˢ (&out, 0.1, 9., |s| s == "hello world
"));

附言请注意 awaitread 调用 async-std 也是阻塞的.它只是阻止了一个系统线程,它只是阻止了一个期货链(本质上是一个无堆栈的绿色线程).poll_read 是非阻塞接口.在 async-std#499 我问过开发人员是否有来自这些 API 的短读保证.

附言Nom 中可能存在类似问题:"我们想告诉 IO 端根据解析器的结果(不完整或不完整)重新填充"

附言看看在 crossterm 中如何实现流读取可能会很有趣.对于 Windows,在 poll.rs,他们正在使用本机 WaitForMultipleObjects.在 unix.rs 中,它们是使用 mio poll.

I'm making a small ncurses application in Rust that needs to communicate with a child process. I already have a prototype written in Common Lisp. I'm trying to rewrite it because CL uses a huge amount of memory for such a small tool.

I'm having some trouble figuring out how to interact with the sub-process.

What I'm currently doing is roughly this:

  1. Create the process:

    let mut program = match Command::new(command)
        .args(arguments)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(child) => child,
        Err(_) => {
            println!("Cannot run program '{}'.", command);
            return;
        }
    };
    

  2. Pass it to an infinite (until user exits) loop, which reads and handles input and listens for output like this (and writes it to the screen):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout {
            Some(ref mut out) => {
                let mut buf_string = String::new();
                match out.read_to_string(&mut buf_string) {
                    Ok(_) => output_viewer.append_string(buf_string),
                    Err(_) => return,
                };
            }
            None => return,
        };
    }
    

The call to read_to_string however blocks the program until the process exits. From what I can see read_to_end and read also seem to block. If I try running something like ls which exits right away, it works, but with something that doesn't exit like python or sbcl it only continues once I kill the subprocess manually.

Based on this answer, I changed the code to use BufReader:

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout.as_mut() {
            Some(out) => {
                let buf_reader = BufReader::new(out);
                for line in buf_reader.lines() {
                    match line {
                        Ok(l) => {
                            output_viewer.append_string(l);
                        }
                        Err(_) => return,
                    };
                }
            }
            None => return,
        }
    }

However, the problem still remains the same. It will read all lines that are available, and then block. Since the tool is supposed to work with any program, there is no way to guess out when the output will end, before trying to read. There doesn't appear to be a way to set a timeout for BufReader either.

解决方案

Streams are blocking by default. TCP/IP streams, filesystem streams, pipe streams, they are all blocking. When you tell a stream to give you a chunk of bytes it will stop and wait till it has the given amout of bytes or till something else happens (an interrupt, an end of stream, an error).

The operating systems are eager to return the data to the reading process, so if all you want is to wait for the next line and handle it as soon as it comes in then the method suggested by Shepmaster in Unable to pipe to or from spawned child process more than once (and also in his answer here) works.
Though in theory it doesn't have to work, because an operating system is allowed to make the BufReader wait for more data in read, but in practice the operating systems prefer the early "short reads" to waiting.

This simple BufReader-based approach becomes even more dangerous when you need to handle multiple streams (like the stdout and stderr of a child process) or multiple processes. For example, BufReader-based approach might deadlock when a child process waits for you to drain its stderr pipe while your process is blocked waiting on it's empty stdout.

Similarly, you can't use BufReader when you don't want your program to wait on the child process indefinitely. Maybe you want to display a progress bar or a timer while the child is still working and gives you no output.

You can't use BufReader-based approach if your operating system happens not to be eager in returning the data to the process (prefers "full reads" to "short reads") because in that case a few last lines printed by the child process might end up in a gray zone: the operating system got them, but they're not large enough to fill the BufReader's buffer.

BufReader is limited to what the Read interface allows it to do with the stream, it's no less blocking than the underlying stream is. In order to be efficient it will read the input in chunks, telling the operating system to fill as much of its buffer as it has available.

You might be wondering why reading data in chunks is so important here, why can't the BufReader just read the data byte by byte. The problem is that to read the data from a stream we need the operating system's help. On the other hand, we are not the operating system, we work isolated from it, so as not to mess with it if something goes wrong with our process. So in order to call to the operating system there needs to be a transition to "kernel mode" which might also incur a "context switch". That is why calling the operating system to read every single byte is expensive. We want as few OS calls as possible and so we get the stream data in batches.

To wait on a stream without blocking you'd need a non-blocking stream. MIO promises to have the required non-blocking stream support for pipes, most probably with PipeReader, but I haven't checked it out so far.

The non-blocking nature of a stream should make it possible to read data in chunks regardless of whether the operating system prefers the "short reads" or not. Because non-blocking stream never blocks. If there is no data in the stream it simply tells you so.

In the absense of a non-blocking stream you'll have to resort to spawning threads so that the blocking reads would be performed in a separate thread and thus won't block your primary thread. You might also want to read the stream byte by byte in order to react to the line separator immediately in case the operating system does not prefer the "short reads". Here's a working example: https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.

P.S. Here's an example of a function that allows to monitor the standard output of a program via a shared vector of bytes:

use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;

/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
    R: Read + Send + 'static,
{
    let out = Arc::new(Mutex::new(Vec::new()));
    let vec = out.clone();
    thread::Builder::new()
        .name("child_stream_to_vec".into())
        .spawn(move || loop {
            let mut buf = [0];
            match stream.read(&mut buf) {
                Err(err) => {
                    println!("{}] Error reading from stream: {}", line!(), err);
                    break;
                }
                Ok(got) => {
                    if got == 0 {
                        break;
                    } else if got == 1 {
                        vec.lock().expect("!lock").push(buf[0])
                    } else {
                        println!("{}] Unexpected number of bytes: {}", line!(), got);
                        break;
                    }
                }
            }
        })
        .expect("!thread");
    out
}

fn main() {
    let mut cat = Command::new("cat")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("!cat");

    let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
    let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
    let mut stdin = match cat.stdin.take() {
        Some(stdin) => stdin,
        None => panic!("!stdin"),
    };
}

With a couple of helpers I'm using it to control an SSH session:

try_s! (stdin.write_all (b"echo hello world
"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world
"));

P.S. Note that await on a read call in async-std is blocking as well. It's just instead of blocking a system thread it only blocks a chain of futures (a stack-less green thread essentially). The poll_read is the non-blocking interface. In async-std#499 I've asked the developers whether there's a short read guarantee from these APIs.

P.S. There might be a similar concern in Nom: "we would want to tell the IO side to refill according to the parser's result (Incomplete or not)"

P.S. Might be interesting to see how stream reading is implemented in crossterm. For Windows, in poll.rs, they are using the native WaitForMultipleObjects. In unix.rs they are using mio poll.

这篇关于如何在 Rust 中不阻塞地读取子进程的输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆