如何使用仅连接一次的多线程从Internet读取数据? [英] How to read data from Internet using muli-threading with connecting only once?
问题描述
我正在使用boost :: asio :: ip :: tcp构建一个小的多线程下载程序.我需要每个线程来处理一部分数据.我知道它可以通过在请求标头中添加"Range:bytes:xx-xx"来解决该问题.但是我不想让程序多次连接到服务器.有什么解决办法吗?
I'm building a tiny muti-threading download program using boost::asio::ip::tcp. I need each thread to deal with a part of data. I know it can solve the problem by adding "Range:bytes:xx-xx" to the request header. But I don't want to let the program connect to the server so many times. Is there any solution?
推荐答案
只需阅读它并在适当时分派给工作线程.
Just read it and dispatch to the worker threads when appropriate.
不知道您想分别处理哪种块,让我们假设 您可以从 https://www.mathsisfun.com/阅读所有质数include/primes-to-100k.zip ,分块读取它们,然后在单独的线程上对所有素数进行一些工作.
Having no clue what kind of chunks you want to separately handle, let's assume you read all of the prime numbers from https://www.mathsisfun.com/includes/primes-to-100k.zip, read them in chunks, then do some work on all the primes on separate threads.
这是一些懒惰的主要工作:
Here's some lazy prime job:
void handle_batch(std::vector<size_t> params) {
if (!params.empty()) {
std::cout
<< "Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << std::accumulate(begin(params), end(params), 0ull)
<< std::endl;
}
}
是的,我们只打印作业参数及其总和的描述.我们可以在其上做些改动以使其更逼真,例如花一些时间,并意识到我们处于工作线程中,因此我们希望同步对控制台的访问.
Yeah, we just print a description of the job params and their sum. We can doodle a bit on it to make it more lifelike, like making it take some time, and being aware that we are on worker threads, so we want to synchronize access to the console.
void handle_batch(std::vector<size_t> params) {
std::mutex s_mx;
if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}
// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "\tRange [" << params.front() << ".." << params.back() << "]"
<< "\tSum:" << sum
<< std::endl;
}
}
好的,对于那些无关紧要的东西,这已经足够了.
Okay, that's enough gore for the unimportant bits.
嗯,我选择的方法有些复杂,因为该站点不仅使用https(ugh),而且还提供ZIP文件(ugh).而且我们正在使用C ++(嗯?).
Well, there's a slight complication with my chosen approach, because not only is that site using https (ugh), also it is serving up ZIP files (ugh). And we're using C++ (ugh?).
至少,我们可以用不太多的代码同步完成整个SSL连接业务,但是我们希望读取是异步的,因为那样我们可以证明这一点
At least, we can do the the whole SSL connect business synchronously in not too much code, we want the reading to be asynchronously, though, because that way we can demonstrate that
- 您可以使用Boost Asio在主线程上进行很多混合的IO
- Boost Process会作为子进程启动
zcat
,以解压缩素数内容(我们假设安装了zcat
的类UNIX系统) - 这意味着我们将异步写入该子进程stdin
- ,并从其标准输出异步读取
- 准备就绪后立即产生批处理作业
- you can do a lot of intermixed IO on just the main thread using Boost Asio
- same goes for Boost Process to launch
zcat
as a child process to unzip the primes content (we'll assume UNIX-like system withzcat
installed) - which means we'll be asynchronously writing to that child process stdin
- and also asynchronously reading from its stdout
- spawning off batch jobs along the way as soon as they're ready
对于您的工作负载而言,这应该是一个很好的模型,因为工作人员比IO花费更多的时间,但是,我们在单个线程上完成了许多IO任务而没有阻塞.
This should be pretty good model for your workload, because the workers take more time than the IO, however, we do many IO tasks on a single thread without blocking.
如前所述,我们将为IO使用一个线程,并为批处理程序使用一个线程池:
As said, we will use a single thread for IO, and a thread pool for the batch workers:
int main() {
net::io_context io; // main thread does all io
net::thread_pool pool(6); // worker threads
在那里.那是一个开始.现在,我们要建立一个SSL连接,并请求该ZIP.在这里:
There. That's a start. Now, we want to have a SSL connection, and request that ZIP. Here it is:
http::response_parser<http::buffer_body> res_reader;
beast::flat_buffer lookahead; // for the res_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);
{ // synchronously write request
std::string host = "www.mathsisfun.com";
connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
http::read_header(s, lookahead, res_reader);
//std::cerr << "Headers: " << res_reader.get().base() << std::endl;
}
是的,已经读取了响应头信息¹.当然我们被骗了,因为我们需要三个帮手:
Yup, that already did the reading of the response headers¹. Of course we cheated because we need three helpers:
-
创建ssl上下文
making an ssl context
auto ssl_context() {
ssl::context ctx{ssl::context::sslv23};
ctx.set_default_verify_paths();
ctx.set_verify_mode(ssl::verify_peer);
return ctx;
}
通过SSL连接
connecting over SSL
void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
net::connect(s.lowest_layer(), eps);
s.lowest_layer().set_option(tcp::no_delay(true));
if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
}
s.handshake(stream::handshake_type::client);
}
发出HTTP请求
making the HTTP request
auto get_request(std::string const& host, std::string const& path) {
using namespace http;
request<string_body> req;
req.version(11);
req.method(verb::get);
req.target("https://" + host + path);
req.set(field::user_agent, "test");
req.set(field::host, host);
std::cerr << req << std::endl;
return req;
}
不错,对于C ++.
现在,我们从异步开始:让我们有一个泵"或循环",它将所有响应数据发送到管道中:
Now we start with the asynchrony: let's have a "pump" or "loop" that sends all the response data into a pipe:
// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);
std::function<void(error_code, size_t)> receive_zip;
receive_zip
是我们所说的循环.这是一个自链接的异步操作.因此,每次调用它时,都会将一些数据泵入管道,并为HTTP响应再调用一个async_read
:
receive_zip
is what we call our loop. It's a self-chaining asynchronous operation. So, everytime it is called, it will pump some data into the pipe, and call one more async_read
for the HTTP response:
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
(error_code ec, size_t /*ignore_this*/)
{
auto& res = response_reader.get();
auto& body = res.body();
if (body.data) {
auto n = sizeof(buf) - body.size;
net::write(pipe_to_zcat, net::buffer(buf, n));
}
bool done = ec && !(ec == http::error::need_buffer);
done += response_reader.is_done();
if (done) {
std::cerr << "receive_zip: " << ec.message() << std::endl;
pipe_to_zcat.close();
} else {
body.data = buf.data();
body.size = buf.size();
http::async_read(s, lookahead, response_reader, receive_zip);
}
};
对缓冲响应的这种稍微复杂的外观读取几乎完全来自现在,我们要做的只是灌注泵:
Now, all we have to do is prime the pump:
// kick off receive loop receive_zip(error_code{}, 0);
Intermezzo,解压缩
这不是有趣的部分,让我们开始:我们正在启动子进程
zcat
,并希望第二个管道读取以下内容的输出:
Intermezzo, Unzip
This is not the interesting part, let's go: We are launching a subprocess
zcat
and want a second pipe to read the output from:
process::async_pipe zcat_output(io); process::child zcat( process::search_path("zcat"), process::std_in < pipe_to_zcat, process::std_out > zcat_output, process::on_exit([](int exitcode, std::error_code ec) { std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n"; }), io);
间歇结束:)
(我们甚至抛出错误报告,因为为什么不这样做?)
现在,我们有了另一个异步读取循环,这次是回读未压缩的素数.在这里,我们将组装批处理作业,以在工作人员池中进行处理.
Now, we have another async read loop, this time to read back the uncompressed primes. This is where we will assemble batch jobs to be handled on the worker pool.
std::function<void(error_code, size_t)> receive_primes; net::streambuf sb;
像
receive_zip
之前一样,receive_primes
是我们的循环驱动程序,sb
缓冲区只是一个缓冲区,可以像通常从std::cin
一样使用std::istream
轻松读取.Like
receive_zip
before,receive_primes
is our loop driver, thesb
buffer is just a buffer which makes it easy to read usingstd::istream
as you'd normally do fromstd::cin
.
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) { { std::istream is(&sb); size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n'); std::vector<size_t> batch(n); std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin()); is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant post(pool, std::bind(handle_batch, std::move(batch))); } if (ec) { std::cerr << "receive_primes: " << ec.message() << std::endl; zcat_output.close(); } else { net::async_read_until(zcat_output, sb, "\n", receive_primes); } };
由于
async_read_until
可能读取部分行,因此我们计算缓冲区中完整行的数量(n
)并将其打包为一个向量.确保我们吃了即将到来的换行符之后,我们...批处理作业最终发布:Because
async_read_until
may read partial lines, we count the number (n
) of full lines in the buffer and pack them into a vector. After we make sure that we eat the impending newline, we ... post the batch job, finally:
post(pool, std::bind(handle_batch, std::move(batch)));
我们将所有权转移到任务上,因为它将在单独的线程上运行,并且处理并发的最佳方法是最小化共享.
We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.
再次给泵加注:
// kick off handler loop as well: receive_primes(error_code{}, 0);
将所有内容放在一起
好吧.准备抗高潮.完成所有异步链的设置后,我们要做的就是...等待.
PUTTING IT ALL TOGETHER
Well. Prepare for the anticlimax. With all the async chains setup, all we need to do is... wait.
io.run(); pool.join(); } // end of main
io.run()
保持两个泵同时运行,并等待子进程,它们都在主线程上,如我们所愿.The
io.run()
keeps running both pumps and awaits the child process, all on the main thread, as we like.
pool.join()
等待所有批处理作业完成,然后再停止线程池.如果省略该行,则可能不会运行所有任务,因为thread_pool
的析构函数在调用join()
之前先调用stop()
.The
pool.join()
waits for all batch jobs to be completed, before stopping the thread pool. If you leave out that line, you might not run all the tasks, because the destructor ofthread_pool
callsstop()
before it callsjoin()
.玩弄缓冲区大小(在我的示例中为512字节),以查看有多大批处理.请注意,512个字节是压缩的字节.
"UNLIVE"演示
遗憾的是,据我所知没有在线编译器支持外部网络访问,因此您必须自己运行该编译器.为了方便起见,这是完整的清单,以及在我的计算机上运行的示例输出:
"UNLIVE" DEMO
Sadly no online compiler that I know of supports external network access, so you'll have to run this one yourself. For convenience, here's a full listing, and sample output from a run on my computer:
Live On Coliru
#include <boost/asio.hpp> #include <boost/asio/ssl.hpp> #include <boost/beast.hpp> #include <boost/beast/http.hpp> #include <boost/process.hpp> #include <boost/process/async.hpp> #include <iomanip> #include <iostream> void handle_batch(std::vector<size_t> params) { std::mutex s_mx; if (!params.empty()) { // emulate some work, because I'm lazy auto sum = std::accumulate(begin(params), end(params), 0ull); // then wait some 100..200ms { using namespace std::chrono_literals; std::mt19937 prng(std::random_device{}()); std::this_thread::sleep_for( std::uniform_real_distribution<>(100,200)(prng)*1ms); } // simple thread id (thread::id displays ugly) auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100; // report results to stdout std::lock_guard lk(s_mx); // make sure the output doesn't intermix std::cout << "Thread #" << std::setw(2) << std::setfill('0') << tid << " Batch n:" << params.size() << "\tRange [" << params.front() << ".." << params.back() << "]" << "\tSum:" << sum << std::endl; } } namespace net = boost::asio; namespace ssl = net::ssl; namespace beast = boost::beast; namespace http = beast::http; namespace process = boost::process; using boost::system::error_code; using boost::system::system_error; using net::ip::tcp; using stream = ssl::stream<tcp::socket>; auto ssl_context() { ssl::context ctx{ssl::context::sslv23}; ctx.set_default_verify_paths(); ctx.set_verify_mode(ssl::verify_peer); return ctx; } void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) { net::connect(s.lowest_layer(), eps); s.lowest_layer().set_option(tcp::no_delay(true)); if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) { throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } }; } s.handshake(stream::handshake_type::client); } auto get_request(std::string const& host, std::string const& path) { using namespace http; request<string_body> req; req.version(11); req.method(verb::get); req.target("https://" + host + path); req.set(field::user_agent, "test"); req.set(field::host, host); std::cerr << req << std::endl; return req; } int main() { net::io_context io; // main thread does all io net::thread_pool pool(6); // worker threads // outside for lifetime http::response_parser<http::buffer_body> response_reader; beast::flat_buffer lookahead; // for the response_reader std::array<char,512> buf{0}; // for download content auto ctx = ssl_context(); ssl::stream<tcp::socket> s(io, ctx); { // synchronously write request std::string host = "www.mathsisfun.com"; connect_https(s, host, tcp::resolver{io}.resolve(host, "https")); http::write(s, get_request(host, "/includes/primes-to-100k.zip")); http::read_header(s, lookahead, response_reader); //std::cerr << "Headers: " << response_reader.get().base() << std::endl; } // now, asynchoronusly read contents process::async_pipe pipe_to_zcat(io); std::function<void(error_code, size_t)> receive_zip; receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) { auto& res = response_reader.get(); auto& body = res.body(); if (body.data) { auto n = sizeof(buf) - body.size; net::write(pipe_to_zcat, net::buffer(buf, n)); } bool done = ec && !(ec == http::error::need_buffer); done += response_reader.is_done(); if (done) { std::cerr << "receive_zip: " << ec.message() << std::endl; pipe_to_zcat.close(); } else { body.data = buf.data(); body.size = buf.size(); http::async_read(s, lookahead, response_reader, receive_zip); } }; // kick off receive loop receive_zip(error_code{}, 0); process::async_pipe zcat_output(io); process::child zcat( process::search_path("zcat"), process::std_in < pipe_to_zcat, process::std_out > zcat_output, process::on_exit([](int exitcode, std::error_code ec) { std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n"; }), io); std::function<void(error_code, size_t)> receive_primes; net::streambuf sb; receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) { { std::istream is(&sb); size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n'); std::vector<size_t> batch(n); std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin()); is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant post(pool, std::bind(handle_batch, std::move(batch))); } if (ec) { std::cerr << "receive_primes: " << ec.message() << std::endl; zcat_output.close(); } else { net::async_read_until(zcat_output, sb, "\n", receive_primes); } }; // kick off handler loop as well: receive_primes(error_code{}, 0); io.run(); pool.join(); }
输出:
GET https://www.mathsisfun.com/includes/primes-to-100k.zip HTTP/1.1 User-Agent: test Host: www.mathsisfun.com receive_zip: Success Child process exited with 0 (Success) receive_primes: End of file Thread #11 Batch n:95 Range [599..1237] Sum:86587 Thread #58 Batch n:170 Range [1249..2549] Sum:320714 Thread #34 Batch n:170 Range [2551..3919] Sum:549880 Thread #54 Batch n:170 Range [3923..5407] Sum:790922 Thread #30 Batch n:170 Range [5413..6863] Sum:1040712 Thread #60 Batch n:108 Range [2..593] Sum:28697 Thread #58 Batch n:170 Range [8429..9923] Sum:1560462 Thread #11 Batch n:170 Range [6869..8423] Sum:1298732 Thread #30 Batch n:146 Range [12703..14087] Sum:1956410 Thread #34 Batch n:147 Range [9929..11329] Sum:1563023 Thread #54 Batch n:146 Range [11351..12697] Sum:1758964 Thread #60 Batch n:146 Range [14107..15473] Sum:2164462 Thread #11 Batch n:146 Range [16943..18313] Sum:2576764 Thread #34 Batch n:146 Range [19861..21313] Sum:3003048 Thread #30 Batch n:146 Range [18329..19853] Sum:2790654 Thread #58 Batch n:146 Range [15493..16937] Sum:2365198 Thread #60 Batch n:146 Range [22721..24109] Sum:3422310 Thread #54 Batch n:146 Range [21317..22717] Sum:3212180 Thread #30 Batch n:146 Range [27179..28661] Sum:4081540 Thread #11 Batch n:146 Range [24113..25693] Sum:3640476 Thread #34 Batch n:146 Range [25703..27143] Sum:3859484 Thread #60 Batch n:146 Range [30223..31741] Sum:4525378 Thread #54 Batch n:146 Range [31751..33211] Sum:4746372 Thread #58 Batch n:146 Range [28663..30211] Sum:4297314 Thread #30 Batch n:146 Range [33223..34693] Sum:4958972 Thread #34 Batch n:146 Range [36307..37799] Sum:5408028 Thread #11 Batch n:146 Range [34703..36299] Sum:5184000 Thread #54 Batch n:146 Range [39371..40973] Sum:5865356 Thread #60 Batch n:146 Range [37811..39367] Sum:5637612 Thread #58 Batch n:146 Range [40993..42433] Sum:6091022 Thread #34 Batch n:146 Range [44029..45613] Sum:6541984 Thread #54 Batch n:146 Range [47287..48817] Sum:7013764 Thread #30 Batch n:146 Range [42437..44027] Sum:6308156 Thread #11 Batch n:146 Range [45631..47279] Sum:6780582 Thread #58 Batch n:146 Range [50341..51913] Sum:7470486 Thread #34 Batch n:146 Range [51929..53569] Sum:7701048 Thread #60 Batch n:146 Range [48821..50333] Sum:7239008 Thread #54 Batch n:146 Range [53591..55147] Sum:7934798 Thread #11 Batch n:146 Range [56713..58211] Sum:8388956 Thread #58 Batch n:146 Range [58217..59771] Sum:8617316 Thread #30 Batch n:146 Range [55163..56711] Sum:8169020 Thread #60 Batch n:146 Range [61519..63197] Sum:9100594 Thread #34 Batch n:146 Range [59779..61511] Sum:8856806 Thread #54 Batch n:146 Range [63199..64849] Sum:9339328 Thread #11 Batch n:146 Range [64853..66457] Sum:9580694 Thread #58 Batch n:146 Range [66463..67979] Sum:9816826 Thread #30 Batch n:146 Range [67987..69779] Sum:10057662 Thread #54 Batch n:146 Range [72931..74573] Sum:10770902 Thread #34 Batch n:146 Range [71347..72923] Sum:10529702 Thread #60 Batch n:146 Range [69809..71341] Sum:10304156 Thread #11 Batch n:146 Range [74587..76231] Sum:11008056 Thread #58 Batch n:146 Range [76243..77801] Sum:11251048 Thread #30 Batch n:146 Range [77813..79561] Sum:11491034 Thread #34 Batch n:146 Range [81119..82729] Sum:11963076 Thread #60 Batch n:146 Range [82757..84449] Sum:12207776 Thread #58 Batch n:146 Range [86183..87767] Sum:12700772 Thread #54 Batch n:146 Range [79579..81101] Sum:11732042 Thread #11 Batch n:146 Range [84457..86179] Sum:12455242 Thread #30 Batch n:146 Range [87793..89527] Sum:12951322 Thread #34 Batch n:146 Range [89533..91153] Sum:13187046 Thread #54 Batch n:146 Range [94441..96013] Sum:13904802 Thread #30 Batch n:146 Range [97829..99487] Sum:14403556 Thread #58 Batch n:146 Range [92779..94439] Sum:13665032 Thread #60 Batch n:146 Range [91159..92767] Sum:13431876 Thread #11 Batch n:146 Range [96017..97813] Sum:14148718 Thread #34 Batch n:46 Range [99497..99991] Sum:4588078
¹您可以通过取消注释该行来打印.请注意,Boost 1.70尚未实现流传输,b1.72有一个关于boost :: process :: async_pipe的错误,因此您需要1.73才能真正打印出这样的标题.
¹ Which you could print by uncommenting that line. Note that Boost 1.70 doesn't have the streaming implemented, b1.72 has a bug regarding boost::process::async_pipe, so you need 1.73 to actually print the headers like that.
这篇关于如何使用仅连接一次的多线程从Internet读取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!