使用boost.process同时读取和写入孩子的stdio [英] simultaneous read and write to child's stdio using boost.process

查看:136
本文介绍了使用boost.process同时读取和写入孩子的stdio的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用boost.process使用类似这样的东西来读写儿童的stdio:

i am trying to write and read to child's stdio using boost.process using something like this:

boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};

bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
    in.async_write_some(
        boost::asio::buffer(src.data(), read),
        [](const boost::system::error_code &e, std::size_t) { });
    // written data is not important, that's why using same buffer
    out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
                        [&](const boost::system::error_code &e,
                           std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();

所有读取和写入操作均被提名为成功,但是totalWrite的值完全不正确,例如报告为29356032,实际值应为50000000左右
我注意到程序在中途终止,
在readService.run()冻结子级之后使用process.wait()终止,
使用atomic int产生相同的行为

现在我实际上只需要知道实际写入了多少数据,这就是为什么我使用相同的缓冲区

all read and write operations are noitified as success but value of totalWrite is totally incorrect f.e reported 29356032, real value should be around 50000000
i noticed the program is terminating half way ,
using process.wait() after readService.run() freezes child,
using atomic int produces same behaviour

for now i actually only need to know how much data is actually wriiten, that's why i am using same buffer

推荐答案

  1. 此模式:

  1. This pattern:

while (callback(CallbackActions::NeedMoreInput, src, read)) {
    in.async_write_some(...);
    out.async_read_some(...);
}

很可能被误导了(异步操作总是立即返回,因此您只需继续添加更多的异步操作而不会给它们运行的​​机会).

is most likely misguided (async operations always immediately return, so you'd simply keep adding more async operations without giving them a chance to run).

误导的事实是,您为管道提供了单独的服务,但是您完全排除了它们的运行,因此在writeService完成之前,将不会运行任何读取操作.

Also misguided is the fact that you have separate services for the pipes, but you're running them in total exclusion, so no read operation will ever run until the writeService completes.

atomic类型被误导,因为无法从多个线程进行访问

atomic types are misguided since there's no access from multiple threads

您要做什么?您保留了一个大缓冲区,但从未将任何数据放入其中(reserve!= resize).因此,您只能希望什么都不写.

What are you trying to do? You reserve a large buffer but never put any data into it (reserve != resize). Therefore you can only hope to write nothing.

更具有讽刺意味的是,您正在完全相同的位置读取完全相同的缓冲区.但是,这立即是未定义的行为¹,因为当您知道.

Even more ironically, you are reading into the exact same buffer, at the exact same spot. However, that's immediately Undefined Behaviour¹ because you pass it src.capacity() when you know that src.size()==0.

即使没有该错误,您如何也可以同时"从内存中完全相同的字节进行读取和写入,仍然知道预期的结果是什么?

Even without that error how can you "simultaneously" read and write from exactly the same bytes in memory and still know what the expected outcome would be?

您没有将自己的io_service传递给Boost Process

you are not passing your own io_service to Boost Process

工作演示

这是一个工作样本.当然,我不得不猜测你实际上想做什么.

A Working Demo

Here's a working sample. Of course I had to guess what you actually want to do.

我选择使程序将其自身的源(main.cpp)发送到stdin,并迭代读取stdout,记录total_received字节.然后打印退出代码和总数.

I opted to make the program send its own source (main.cpp) to stdin, and read stdout iteratively, recording the total_received bytes. It then prints the exit code and that total.

作为临时压缩机,我使用了'/usr/bin/xxd',因为它可用并且甚至可以有用地打印到std::cout进行调试.

As a make-shift compressor I used '/usr/bin/xxd' because it's available and could even be usefully printed to std::cout for debugging.

在Coliru上直播 //在Coliru上遇到麻烦

Live On Coliru // trouble on Coliru

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);

namespace bp = boost::process;
using boost::system::error_code;

using Loop = boost::function<void()>;
using Buffer = std::array<char, 4*1024>;

int main() {
    boost::asio::io_service svc;

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);

    auto data = read_file("main.cpp");

    Loop read_loop, write_loop;

    Buffer recv_buffer;
    std::size_t total_received = 0;
    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
        out.async_read_some(boost::asio::buffer(recv_buffer),
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
                total_received += transferred; 
                if (!ec)
                    read_loop(); // continue reading
            });
    };

    boost::asio::async_write(in, boost::asio::buffer(data),
        [&](error_code ec, size_t transferred) {
            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
            in.close(ec);
            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
        }); // async

    read_loop(); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}

#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
    std::ifstream ifs(fname);
    return {std::istreambuf_iterator<char>(ifs), {}};
}

打印

WriteLoop: Success done, 1787 bytes
WriteLoop: closed pipe (Success)
ReadLoop: Success got 4096 bytes
ReadLoop: Success got 3515 bytes
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=7611

说明,简化

请注意,我们所做的所有工作都没有循环.这是因为boost::asio::async_write组成的操作(它隐藏了循环).

Explanations, Simplifications

Note that we do all of the writing without a loop. That's because boost::asio::async_write is a composed operation (it hides the loop).

同样,如果您可以负担得起"将接收到的全部数据存储在内存中,则可以使用boost::asio::streambuf并使用类似的组合操作来简化操作:

Likewise, if you can "afford" to store the whole received data in memory, you can simplify by using boost::asio::streambuf and using a similar composed operation:

在Coliru上直播 //在Coliru上遇到麻烦

Live On Coliru // trouble on Coliru

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);

namespace bp = boost::process;
using boost::system::error_code;

int main() {
    boost::asio::io_service svc;

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);

    auto data = read_file("main.cpp");

    boost::asio::streambuf recv_buffer;
    boost::asio::async_read(out, recv_buffer,
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
            });

    boost::asio::async_write(in, boost::asio::buffer(data),
        [&](error_code ec, size_t transferred) {
            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
            in.close(ec);
            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
        }); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "\n";
}

#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
    std::ifstream ifs(fname);
    return {std::istreambuf_iterator<char>(ifs), {}};
}

反之,如果您不能在发送之前将所有数据都存储在内存中,则可以创建一个循环以逐块方式发送输入

Conversely, if you cannot afford to have all the data in memory before sending, you can create a loop to send input block-wise

两个带延迟的异步循环

让我们这样做,并通过在写入每个块之前延迟一秒钟来使其更具娱乐性.您期望看到的是由于延迟而交替进行读/写:

Two Asynchronous Loops, With Delays

Let's do that, and make it more entertaining by delaying a second before writing each block. What you'd expect to see is alternating reads/writes happening because of the delays:

在Coliru上直播 //在Coliru上运行

Live On Coliru // yay running on Coliru

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
#include <fstream>

namespace bp = boost::process;
using boost::system::error_code;
using namespace std::chrono_literals;

using Loop = boost::function<void()>;
using Buffer = std::array<char, 500>;

int main() {
    boost::asio::io_service svc;
    auto on_exit = [](int code, std::error_code ec) {
            std::cout << "Exited " << code << " (" << ec.message() << ")\n";
        };

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));

    Loop read_loop, write_loop;

    Buffer recv_buffer;
    std::size_t total_received = 0;
    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
        out.async_read_some(boost::asio::buffer(recv_buffer),
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
                total_received += transferred; 
                if (!ec)
                    read_loop(); // continue reading
            });
    };

    std::ifstream ifs("main.cpp");
    std::size_t total_written = 0;
    Buffer send_buffer;
    boost::asio::high_resolution_timer send_delay(svc);
    write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {
        if (!ifs.good())
        {
            error_code ec;
            in.close(ec);
            std::cout << "WriteLoop: closed stdin (" << ec.message() << ")\n";
            return;
        }
        ifs.read(send_buffer.data(), send_buffer.size());

        boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),
            [&](error_code ec, size_t transferred) {
                std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes\n";
                total_written += transferred; 
                if (!ec) {
                    send_delay.expires_from_now(1s);
                    send_delay.async_wait([&write_loop](error_code ec) {
                        std::cout << "WriteLoop: send delay " << ec.message() << "\n";
                        if (!ec) write_loop(); // continue writing
                    });
                }
            });
    };

    read_loop(); // async
    write_loop(); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}

打印

WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 134 bytes
WriteLoop: send delay Success
WriteLoop: closed stdin (Success)
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 22 bytes
Exited 0 (Success)
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=11214


¹也许只是未指定,我现在不倾向于找出区别


¹ perhaps just unspecified, I'm not inclined to find out the difference right now

这篇关于使用boost.process同时读取和写入孩子的stdio的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆