Boost :: process输出空白行 [英] Boost::process output blank lines

查看:164
本文介绍了Boost :: process输出空白行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个应用程序,需要根据用户输入启动和停止各种不同的可执行文件.我希望我的核心"程序在这些可执行文件运行时能正常运行,即不要等待它们在理论上是无限的终止.除此之外,我还需要能够接收std_out并将std_in发送到这些可执行文件.

I am developing an application where I need to launch and stop a variety of different executables depending on user input. I would like my "core" program to run as normal whilst these executables run, i.e not wait for their termination which could theoretically be infinte. As well as this I need to be able to receive std_out and send std_in to these executables.

目前,我已经建立了一个具有流程管理器类的地方:

At the moment I have a set up where I have a process manager class:

class ProcessManager {
private:
    std::vector<patchProcess> processList;
    boost::process::group processGroup;
public:
    ProcessManager();
    void addNew(std::string name,std::string command, std::string args);
    void killAll();
    void printAllIn();
};

修补程序在哪里:

struct patchProcess {
    std::string name;
    boost::process::child *process;
    std::shared_ptr<boost::process::ipstream> procOutStream;
};

在哪里可以启动/添加具有该功能的新进程

Where I can launch / add a new process with the function

void bbefxProcessManager::addNew(std::string name, std::string command, std::string args) {
    LOG(info) << "Creating process for patch " << name;
    patchProcess pp;
    pp.name = name;
    pp.procOutStream = std::shared_ptr<boost::process::ipstream>(new boost::process::ipstream);
    boost::process::child newProc(command,args,processGroup,boost::process::std_out > *pp.procOutStream);
    pp.process = &newProc;
    processList.push_back(pp);
}

我的打印尝试:

void bbefxProcessManager::printAllIn() {
    std::string line;
        for (auto &proc : processList) {
            std::getline(*proc.procOutStream, line);
            std::cout << line << std::endl;
        }
}

此代码成功启动了该过程,但是readAllIn给了我空白的输出.我感觉到我在std::shared_ptr<boost::process::ipstream> procOutStream;上做错了什么.我这样做的基本原理是我正在将push_back用于我的processList(结构的向量)中,因此它应该是可复制的.我可以不使用patchProcess结构和这些共享的指针而获得测试执行程序的输出,但这会使管理变得困难/混乱.我还可以确认是否尝试读取addNew函数中的输出,例如:

This code sucessfully launches the process, however readAllIn gives me a blank output. I have a feeling that I am doing something horribly wrong with std::shared_ptr<boost::process::ipstream> procOutStream;. My rationale behind this is that I am using push_back into my processList (vector of struct), so it should be copyable. I can get the output of a test exec without using the patchProcess struct and these shared pointers but that makes mangement hard / messy. I can also confirm that if I attempt to read the output in the addNew function with something like:

while(true) {
        *pp.procOutStream >> line;
        std::cout << line << std::endl;

    }

我得到可执行文件的输出.那么这是否暗示复制构造函数出了什么问题?

I get the output of my executable. So does this hint something is going wrong with copy constructors?

推荐答案

在您进行编辑之前,我开始就一种真正的异步方法进行一些工作:

Before your edits, I started doing some work on a truly async approach:

让我们摆脱繁琐的手续:

Let's get the formalities out of the way:

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>

namespace ba = boost::asio;
namespace bp = boost::process;

#include <iostream>
#define LOG(x) std::clog

现在,我们可以创建一个ProcessManager,它在析构函数中关闭的单个io_service上运行所有进程.

Now lets create a ProcessManager that runs all processes on a single io_service that is shutdown in the destructor.

IO服务用于安排所有工作(例如异步IO).我已经

The IO service is used to schedule all the work (like asynchronous IO). I've

  • 随机决定专注于面向行的IO操作
  • 确定可能没有理由使用超过1个IO线程,但是如果您曾经使用过,则strand可以正确地同步针对孩子的操作.
  • randomly decided to focus on line-oriented IO operations
  • decided that there's likely no reason to use more than 1 IO thread, but in case you would ever, there is the strand that correctly synchronizes operations with respect to a child.
#include <map>
#include <list>
#include <thread>
class ProcessManager { // ugh naming smell
    using error_code = boost::system::error_code;
  private:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _keep{_service};
    boost::process::group _group;
    std::thread io_thread;

    struct patchProcess : std::enable_shared_from_this<patchProcess> {
        using ptr = std::shared_ptr<patchProcess>;
        static ptr start(std::string command, std::vector<std::string> args, ProcessManager& mgr) {
            ptr p(new patchProcess(std::move(command), std::move(args), mgr));
            p->output_read_loop();
            return p;
        }

        boost::optional<std::string> getline() {
            std::lock_guard<std::mutex> lk(_mx);
            std::string s;
            if (has_newline(_output.data()) && std::getline(std::istream(&_output), s))
                return s;
            return boost::none;
        }

        void write(std::string message) {
            std::lock_guard<std::mutex> lk(_mx);
            _input_bufs.push_back({false, std::move(message)});

            if (_input_bufs.size() == 1)
                input_write_loop();
        }

        void close_stdin() {
            std::lock_guard<std::mutex> lk(_mx);
            if (_input_bufs.empty()) {
                _strand.post([this, self=shared_from_this()] { _stdin.close(); });
            } else {
                _input_bufs.push_back({true, {}});
            }
        }

        bool is_running() { return _process.running(); }

      private:
        patchProcess(std::string command, std::vector<std::string> args, ProcessManager& mgr)
            : _strand(mgr._service),
              _stdout(mgr._service), _stdin(mgr._service),
              _process(command, args, mgr._group, bp::std_out > _stdout, bp::std_in < _stdin, mgr._service)
        { }

        void output_read_loop() {
            ba::async_read_until(_stdout, pending_output, "\n", _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
                if (!ec) {
                    std::lock_guard<std::mutex> lk(_mx);
                    std::ostream(&_output) << &pending_output;
                    output_read_loop();
                }
            }));
        }

        void input_write_loop() { // assumes _mx locked
            if (!_input_bufs.empty()) {
                auto& msg = _input_bufs.front();
                if (msg.eof) {
                    _strand.post([this, self=shared_from_this()] { _stdin.close(); });
                } else {
                    ba::async_write(_stdin, ba::buffer(_input_bufs.front().pay_load), 
                        _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
                            std::lock_guard<std::mutex> lk(_mx);
                            _input_bufs.pop_front();
                            if (!ec)
                                input_write_loop();
                        }));
                }
            }
        }

        ba::io_service::strand _strand; // thread-safe

        // strand-local
        bp::async_pipe _stdout, _stdin;
        bp::child _process;
        ba::streambuf pending_output;

        // mutex protected
        std::mutex mutable _mx;
        struct out_message { bool eof; std::string pay_load; };
        std::list<out_message> _input_bufs; // iterator stability again!
        ba::streambuf _output;

        // static helpers
        template <typename T>
        static bool has_newline(T buffer) {
            return std::find(buffers_begin(buffer), buffers_end(buffer), '\n') != buffers_end(buffer);
        }
    };

    using Map = std::map<std::string, patchProcess::ptr>; // iterator stability required!
    Map processList;

    void eventloop() {
        for(;;) try {
            if (!_service.run()) break;
        } catch(std::exception const& e) {
            LOG(error) << "Exception in handler: " << e.what() << "\n";
        }
    }
  public:
    ProcessManager() : io_thread([this] { eventloop(); }) { }

    ~ProcessManager() {
        status(__FUNCTION__);
        _keep.reset();
        io_thread.join();
        status(__FUNCTION__);
    }

    void status(std::string const& caption = "Status") const {
        for (auto& p : processList) {
            LOG(info) << caption << ": '" << p.first << "' is " << (p.second->is_running()? "still running":"done") << "\n";
        }
    }

    patchProcess::ptr addNew(std::string name, std::string command, std::vector<std::string> args) {
        auto pit = processList.find(name);
        if (pit != processList.end()) {
            if (pit->second->is_running()) {
                LOG(error) << "Process already running ('" << name << "')\n";
                return {};
            }
            // TODO make sure process cleaned up etc.
        }
        LOG(info) << "Creating process for patch " << name << "\n";
        return processList[name] = patchProcess::start(std::move(command), std::move(args), *this);
    }
};

Demos

最幼稚的奔跑是:

Demos

The most naive run would be:

int main() {
    ProcessManager pm;
}

在无所事事之后,可以预期地返回.接下来,我们尝试

Which, predictably returns after doing nothing. Next, we try

int main() {
    ProcessManager pm;
    pm.addNew("sleeper", "/bin/bash", {"-c", "sleep 3" });
}

可以预料的是,这需要等待3秒才能退出.它打印:

Which predictably waits 3 seconds before exiting. It prints:

Creating process for patch sleeper
~ProcessManager: 'sleeper' is still running
~ProcessManager: 'sleeper' is done

但是等待!,您不是明确地说您不想等待吗?好吧,没有!在此期间,您可以随心所欲地做任何事情.只是ProcessManager的析构函数将-默认情况下-等待子进程完成.

But WAIT! Didn't you specifically say you didn't want waiting? Well, there is none! You can do whatever you please in the mean time. It's just that ProcessManager's destructor will - by default - wait for the child to finish.

让我们做一些IO:

在Coliru上直播

int main() {
    ProcessManager pm;

    auto ls  = pm.addNew("listing", "/bin/ls", {"-ltr" });

    boost::optional<std::string> l;

    while ((l = ls->getline()) || ls->is_running()) {
        if (l.is_initialized()) {
            std::cout << "ls: " << std::quoted(*l) << std::endl;
            l.reset();
        }
    }
}

打印

Creating process for patch listing
ls: "total 172"
ls: "-rw-rw-rw- 1 2001 2000   5645 Feb 11 00:10 main.cpp"
ls: "-rwxr-xr-x 1 2001 2000 162784 Feb 11 00:10 a.out"
~ProcessManager: 'listing' is done
~ProcessManager: 'listing' is done

要真正弄清进程的IO是同步的,我们可以替换

To really drive the point home that the processes and their IO are synchronous, we can replace

auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });

更多时变的东西:

auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });

现在,要使其真正具有挑战性,让我们添加另一个子进程并将ls的输出发送到其他child:

Now, to make it really challenging, let's add another child process and send the output of the ls to the other child:

在Coliru上直播

int main() {
    ProcessManager pm;

    auto ls  = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });
    auto xxd = pm.addNew("hex encoding", "/usr/bin/xxd", {});

    boost::optional<std::string> l, x;

    bool closed = false;
    while ((l || (l = ls->getline())) || (x || (x = xxd->getline())) || ls->is_running() || xxd->is_running()) {
        if (l.is_initialized()) {
            xxd->write(std::move(*l) + '\n');
            l.reset();
            std::cout << "[forwarded from ls to xxd]" << std::endl;
        } else {
            if (!closed && !ls->is_running()) {
                std::cout << "[closing input to xxd]" << std::endl;
                xxd->close_stdin();
                closed = true;
            }
        }

        if (x.is_initialized()) {
            std::cout << std::quoted(*x) << std::endl;
            x.reset();
        }
    }
}

现在,在Coliru上,列表太小了,没意思,但是在我的系统上,您得到的输出如下:

Now, on Coliru the listing is too small to be interesting, but on my system you get output like:

Creating process for patch listing
Creating process for patch hex encoding
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
"00000000: 746f 7461 6c20 3733 3635 0a2d 7277 2d72  total 7365.-rw-r"
"00000010: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000020: 6865 2020 2020 3133 3737 206d 6569 2031  he    1377 mei 1"
"00000030: 3020 2032 3031 3720 636d 616b 655f 696e  0  2017 cmake_in"
"00000040: 7374 616c 6c2e 636d 616b 650a 6c72 7778  stall.cmake.lrwx"
"00000050: 7277 7872 7778 2020 3120 7365 6865 2073  rwxrwx  1 sehe s"
"00000060: 6568 6520 2020 2020 2020 3820 6d65 6920  ehe       8 mei "
"00000070: 3234 2020 3230 3137 206d 6169 6e2e 6370  24  2017 main.cp"
"00000080: 7020 2d3e 2074 6573 742e 6370 700a 2d72  p -> test.cpp.-r"
"00000090: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"000000a0: 2073 6568 6520 2020 2020 3531 3420 7365   sehe     514 se"
"000000b0: 7020 3133 2030 383a 3336 2063 6f6d 7069  p 13 08:36 compi"
"000000c0: 6c65 5f63 6f6d 6d61 6e64 732e 6a73 6f6e  le_commands.json"
"000000d0: 0a2d 7277 2d72 772d 722d 2d20 2031 2073  .-rw-rw-r--  1 s"
"000000e0: 6568 6520 7365 6865 2020 2020 3135 3834  ehe sehe    1584"
"000000f0: 2073 6570 2032 3020 3232 3a30 3320 576f   sep 20 22:03 Wo"
"00000100: 7264 436f 756e 7465 722e 680a 2d72 772d  rdCounter.h.-rw-"
"00000110: 7277 2d72 2d2d 2020 3120 7365 6865 2073  rw-r--  1 sehe s"
"00000120: 6568 6520 2020 2020 3336 3920 7365 7020  ehe     369 sep "
"00000130: 3233 2030 333a 3131 2063 6f6d 6d6f 6e2e  23 03:11 common."
"00000140: 680a 2d72 772d 7277 2d72 2d2d 2020 3120  h.-rw-rw-r--  1 "
"00000150: 7365 6865 2073 6568 6520 2020 2020 3533  sehe sehe     53"
"00000160: 3920 7365 7020 3233 2030 333a 3131 2073  9 sep 23 03:11 s"
"00000170: 7472 7563 7473 616d 706c 652e 6870 700a  tructsample.hpp."
"00000180: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"00000190: 6865 2073 6568 6520 2020 2032 3335 3220  he sehe    2352 "
"000001a0: 7365 7020 3238 2032 333a 3230 2061 6461  sep 28 23:20 ada"
"000001b0: 7074 6976 655f 7061 7273 6572 2e68 0a2d  ptive_parser.h.-"
"000001c0: 7277 2d72 772d 722d 2d20 2031 2073 6568  rw-rw-r--  1 seh"
"000001d0: 6520 7365 6865 2020 2020 3538 3738 2073  e sehe    5878 s"
"000001e0: 6570 2032 3820 3233 3a32 3120 6164 6170  ep 28 23:21 adap"
"000001f0: 7469 7665 5f70 6172 7365 722e 6370 700a  tive_parser.cpp."
"00000200: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"00000210: 6865 2073 6568 6520 2020 2034 3232 3720  he sehe    4227 "
"00000220: 6f6b 7420 2034 2032 333a 3137 2070 686f  okt  4 23:17 pho"
"00000230: 656e 695f 7833 2e68 7070 0a2d 7277 2d72  eni_x3.hpp.-rw-r"
"00000240: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000250: 6865 2020 2031 3432 3035 2064 6563 2020  he   14205 dec  "
"00000260: 3620 3231 3a30 3820 434d 616b 6543 6163  6 21:08 CMakeCac"
"00000270: 6865 2e74 7874 0a2d 7277 2d72 772d 722d  he.txt.-rw-rw-r-"
"00000280: 2d20 2031 2073 6568 6520 7365 6865 2020  -  1 sehe sehe  "
"00000290: 2020 3630 3738 2064 6563 2031 3420 3032    6078 dec 14 02"
"000002a0: 3a35 3320 636f 6e6e 6563 7469 6f6e 2e68  :53 connection.h"
"000002b0: 7070 0a2d 7277 7872 7778 722d 7820 2031  pp.-rwxrwxr-x  1"
"000002c0: 2073 6568 6520 7365 6865 2020 2020 3136   sehe sehe    16"
"000002d0: 3736 206a 616e 2031 3220 3032 3a34 3420  76 jan 12 02:44 "
"000002e0: 636f 6d70 696c 655f 6266 2e70 790a 2d72  compile_bf.py.-r"
"000002f0: 772d 722d 2d72 2d2d 2020 3120 7365 6865  w-r--r--  1 sehe"
"00000300: 2073 6568 6520 2020 2038 3738 3020 6a61   sehe    8780 ja"
"00000310: 6e20 3132 2031 373a 3131 2074 6573 742e  n 12 17:11 test."
"00000320: 6269 6e0a 2d72 7778 7277 7872 2d78 2020  bin.-rwxrwxr-x  "
"00000330: 3120 7365 6865 2073 6568 6520 2020 2020  1 sehe sehe     "
"00000340: 3131 3920 6a61 6e20 3235 2031 333a 3537  119 jan 25 13:57"
"00000350: 2074 6573 742e 7079 0a2d 7277 7872 7778   test.py.-rwxrwx"
"00000360: 722d 7820 2031 2073 6568 6520 7365 6865  r-x  1 sehe sehe"
"00000370: 2020 2020 2020 3736 2066 6562 2020 3820        76 feb  8 "
"00000380: 3130 3a33 3920 7465 7374 2e73 680a 2d72  10:39 test.sh.-r"
"00000390: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"000003a0: 2073 6568 6520 2020 3236 3536 3920 6665   sehe   26569 fe"
"000003b0: 6220 2039 2031 313a 3533 2064 7261 6674  b  9 11:53 draft"
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[closing input to xxd]
"000003c0: 2e6d 640a 2d72 772d 7277 2d72 2d2d 2020  .md.-rw-rw-r--  "
"000003d0: 3120 7365 6865 2073 6568 6520 2020 2020  1 sehe sehe     "
"000003e0: 3131 3620 6665 6220 2039 2031 313a 3534  116 feb  9 11:54"
"000003f0: 2069 6e70 7574 2e74 7874 0a2d 7277 2d72   input.txt.-rw-r"
"00000400: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000410: 6865 2020 2020 2020 3739 2066 6562 2031  he      79 feb 1"
"00000420: 3020 3136 3a32 3420 6172 7869 760a 2d72  0 16:24 arxiv.-r"
"00000430: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"00000440: 2073 6568 6520 2020 2032 3933 3520 6665   sehe    2935 fe"
"00000450: 6220 3130 2031 363a 3238 2043 4d61 6b65  b 10 16:28 CMake"
"00000460: 4c69 7374 732e 7478 740a 2d72 772d 7277  Lists.txt.-rw-rw"
"00000470: 2d72 2d2d 2020 3120 7365 6865 2073 6568  -r--  1 sehe seh"
"00000480: 6520 2020 2035 3134 3520 6665 6220 3130  e    5145 feb 10"
"00000490: 2031 363a 3238 204d 616b 6566 696c 650a   16:28 Makefile."
"000004a0: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"000004b0: 6865 2073 6568 6520 2020 2033 3937 3620  he sehe    3976 "
"000004c0: 6665 6220 3130 2031 363a 3430 2074 6573  feb 10 16:40 tes"
"000004d0: 7431 2e63 7070 0a2d 7277 2d72 772d 722d  t1.cpp.-rw-rw-r-"
"000004e0: 2d20 2031 2073 6568 6520 7365 6865 2020  -  1 sehe sehe  "
"000004f0: 2020 3632 3434 2066 6562 2031 3120 3031    6244 feb 11 01"
"00000500: 3a31 3320 7465 7374 2e63 7070 0a2d 7277  :13 test.cpp.-rw"
"00000510: 7872 7778 722d 7820 2031 2073 6568 6520  xrwxr-x  1 sehe "
"00000520: 7365 6865 2037 3139 3336 3838 2066 6562  sehe 7193688 feb"
"00000530: 2031 3120 3031 3a31 3320 736f 7465 7374   11 01:13 sotest"
"00000540: 0a2d 7277 2d72 772d 722d 2d20 2031 2073  .-rw-rw-r--  1 s"
"00000550: 6568 6520 7365 6865 2020 2020 3535 3132  ehe sehe    5512"
"00000560: 2066 6562 2031 3120 3031 3a31 3620 5365   feb 11 01:16 Se"
"00000570: 7373 696f 6e2e 7669 6d0a 6472 7778 7277  ssion.vim.drwxrw"
"00000580: 7872 2d78 2031 3120 7365 6865 2073 6568  xr-x 11 sehe seh"
"00000590: 6520 2020 2020 2032 3320 6665 6220 3131  e      23 feb 11"
"000005a0: 2030 313a 3137 2043 4d61 6b65 4669 6c65   01:17 CMakeFile"
"000005b0: 730a 2d72 772d 7277 2d72 2d2d 2020 3120  s.-rw-rw-r--  1 "
"000005c0: 7365 6865 2073 6568 6520 2020 2020 2037  sehe sehe      7"
"000005d0: 3520 6665 6220 3131 2030 313a 3137 206f  5 feb 11 01:17 o"
"000005e0: 7574 7075 742e 7478 740a                 utput.txt."
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done

这篇关于Boost :: process输出空白行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆