使用boost.process同时读取和写入孩子的stdio [英] simultaneous read and write to child's stdio using boost.process
问题描述
我正在尝试使用boost.process使用类似这样的东西来读写儿童的stdio:
i am trying to write and read to child's stdio using boost.process using something like this:
boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};
bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
in.async_write_some(
boost::asio::buffer(src.data(), read),
[](const boost::system::error_code &e, std::size_t) { });
// written data is not important, that's why using same buffer
out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
[&](const boost::system::error_code &e,
std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();
所有读取和写入操作均被提名为成功,但是totalWrite的值完全不正确,例如报告为29356032,实际值应为50000000左右
我注意到程序在中途终止,
在readService.run()冻结子级之后使用process.wait()终止,
使用atomic int产生相同的行为
现在我实际上只需要知道实际写入了多少数据,这就是为什么我使用相同的缓冲区
all read and write operations are noitified as success but value of totalWrite is totally incorrect f.e reported 29356032, real value should be around 50000000
i noticed the program is terminating half way ,
using process.wait() after readService.run() freezes child,
using atomic int produces same behaviour
for now i actually only need to know how much data is actually wriiten, that's why i am using same buffer
推荐答案
-
此模式:
This pattern:
while (callback(CallbackActions::NeedMoreInput, src, read)) {
in.async_write_some(...);
out.async_read_some(...);
}
很可能被误导了(异步操作总是立即返回,因此您只需继续添加更多的异步操作而不会给它们运行的机会).
is most likely misguided (async operations always immediately return, so you'd simply keep adding more async operations without giving them a chance to run).
误导的事实是,您为管道提供了单独的服务,但是您完全排除了它们的运行,因此在writeService完成之前,将不会运行任何读取操作.
Also misguided is the fact that you have separate services for the pipes, but you're running them in total exclusion, so no read operation will ever run until the writeService completes.
atomic
类型被误导,因为无法从多个线程进行访问
atomic
types are misguided since there's no access from multiple threads
您要做什么?您保留了一个大缓冲区,但从未将任何数据放入其中(reserve
!= resize
).因此,您只能希望什么都不写.
What are you trying to do? You reserve a large buffer but never put any data into it (reserve
!= resize
). Therefore you can only hope to write nothing.
更具有讽刺意味的是,您正在完全相同的位置读取完全相同的缓冲区.但是,这立即是未定义的行为¹,因为当您知道
Even more ironically, you are reading into the exact same buffer, at the exact same spot. However, that's immediately Undefined Behaviour¹ because you pass it src.capacity()
when you know that src.size()==0
.
即使没有该错误,您如何也可以同时"从内存中完全相同的字节进行读取和写入,仍然知道预期的结果是什么?
Even without that error how can you "simultaneously" read and write from exactly the same bytes in memory and still know what the expected outcome would be?
您没有将自己的io_service
传递给Boost Process
you are not passing your own io_service
to Boost Process
工作演示
这是一个工作样本.当然,我不得不猜测你实际上想做什么.
A Working Demo
Here's a working sample. Of course I had to guess what you actually want to do.
我选择使程序将其自身的源(main.cpp)发送到stdin,并迭代读取stdout,记录total_received
字节.然后打印退出代码和总数.
I opted to make the program send its own source (main.cpp) to stdin, and read stdout iteratively, recording the total_received
bytes. It then prints the exit code and that total.
作为临时压缩机,我使用了'/usr/bin/xxd'
,因为它可用并且甚至可以有用地打印到std::cout
进行调试.
As a make-shift compressor I used '/usr/bin/xxd'
because it's available and could even be usefully printed to std::cout
for debugging.
在Coliru上直播 //在Coliru上遇到麻烦
Live On Coliru // trouble on Coliru
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);
namespace bp = boost::process;
using boost::system::error_code;
using Loop = boost::function<void()>;
using Buffer = std::array<char, 4*1024>;
int main() {
boost::asio::io_service svc;
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);
auto data = read_file("main.cpp");
Loop read_loop, write_loop;
Buffer recv_buffer;
std::size_t total_received = 0;
read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
out.async_read_some(boost::asio::buffer(recv_buffer),
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
total_received += transferred;
if (!ec)
read_loop(); // continue reading
});
};
boost::asio::async_write(in, boost::asio::buffer(data),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
in.close(ec);
std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
}); // async
read_loop(); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}
#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
std::ifstream ifs(fname);
return {std::istreambuf_iterator<char>(ifs), {}};
}
打印
WriteLoop: Success done, 1787 bytes
WriteLoop: closed pipe (Success)
ReadLoop: Success got 4096 bytes
ReadLoop: Success got 3515 bytes
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=7611
说明,简化
请注意,我们所做的所有工作都没有循环.这是因为boost::asio::async_write
是组成的操作(它隐藏了循环).
Explanations, Simplifications
Note that we do all of the writing without a loop. That's because boost::asio::async_write
is a composed operation (it hides the loop).
同样,如果您可以负担得起"将接收到的全部数据存储在内存中,则可以使用boost::asio::streambuf
并使用类似的组合操作来简化操作:
Likewise, if you can "afford" to store the whole received data in memory, you can simplify by using boost::asio::streambuf
and using a similar composed operation:
在Coliru上直播 //在Coliru上遇到麻烦
Live On Coliru // trouble on Coliru
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);
namespace bp = boost::process;
using boost::system::error_code;
int main() {
boost::asio::io_service svc;
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);
auto data = read_file("main.cpp");
boost::asio::streambuf recv_buffer;
boost::asio::async_read(out, recv_buffer,
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
});
boost::asio::async_write(in, boost::asio::buffer(data),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes\n";
in.close(ec);
std::cout << "WriteLoop: closed pipe (" << ec.message() << ")\n";
}); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "\n";
}
#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
std::ifstream ifs(fname);
return {std::istreambuf_iterator<char>(ifs), {}};
}
反之,如果您不能在发送之前将所有数据都存储在内存中,则可以创建一个循环以逐块方式发送输入
Conversely, if you cannot afford to have all the data in memory before sending, you can create a loop to send input block-wise
两个带延迟的异步循环
让我们这样做,并通过在写入每个块之前延迟一秒钟来使其更具娱乐性.您期望看到的是由于延迟而交替进行读/写:
Two Asynchronous Loops, With Delays
Let's do that, and make it more entertaining by delaying a second before writing each block. What you'd expect to see is alternating reads/writes happening because of the delays:
在Coliru上直播 //在Coliru上运行
Live On Coliru // yay running on Coliru
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
#include <fstream>
namespace bp = boost::process;
using boost::system::error_code;
using namespace std::chrono_literals;
using Loop = boost::function<void()>;
using Buffer = std::array<char, 500>;
int main() {
boost::asio::io_service svc;
auto on_exit = [](int code, std::error_code ec) {
std::cout << "Exited " << code << " (" << ec.message() << ")\n";
};
std::string const CompressCmd = "/usr/bin/xxd";
bp::async_pipe in{svc}, out{svc};
bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));
Loop read_loop, write_loop;
Buffer recv_buffer;
std::size_t total_received = 0;
read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
out.async_read_some(boost::asio::buffer(recv_buffer),
[&](error_code ec, size_t transferred) {
std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes\n";
total_received += transferred;
if (!ec)
read_loop(); // continue reading
});
};
std::ifstream ifs("main.cpp");
std::size_t total_written = 0;
Buffer send_buffer;
boost::asio::high_resolution_timer send_delay(svc);
write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {
if (!ifs.good())
{
error_code ec;
in.close(ec);
std::cout << "WriteLoop: closed stdin (" << ec.message() << ")\n";
return;
}
ifs.read(send_buffer.data(), send_buffer.size());
boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),
[&](error_code ec, size_t transferred) {
std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes\n";
total_written += transferred;
if (!ec) {
send_delay.expires_from_now(1s);
send_delay.async_wait([&write_loop](error_code ec) {
std::cout << "WriteLoop: send delay " << ec.message() << "\n";
if (!ec) write_loop(); // continue writing
});
}
});
};
read_loop(); // async
write_loop(); // async
svc.run(); // Await all async operations
std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "\n";
}
打印
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 134 bytes
WriteLoop: send delay Success
WriteLoop: closed stdin (Success)
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 22 bytes
Exited 0 (Success)
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=11214
¹也许只是未指定,我现在不倾向于找出区别
¹ perhaps just unspecified, I'm not inclined to find out the difference right now
这篇关于使用boost.process同时读取和写入孩子的stdio的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!