如何使用仅连接一次的多线程从Internet读取数据? [英] How to read data from Internet using muli-threading with connecting only once?

查看:63
本文介绍了如何使用仅连接一次的多线程从Internet读取数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用boost :: asio :: ip :: tcp构建一个小的多线程下载程序.我需要每个线程来处理一部分数据.我知道它可以通过在请求标头中添加"Range:bytes:xx-xx"来解决该问题.但是我不想让程序多次连接到服务器.有什么解决办法吗?

I'm building a tiny muti-threading download program using boost::asio::ip::tcp. I need each thread to deal with a part of data. I know it can solve the problem by adding "Range:bytes:xx-xx" to the request header. But I don't want to let the program connect to the server so many times. Is there any solution?

推荐答案

只需阅读它并在适当时分派给工作线程.

Just read it and dispatch to the worker threads when appropriate.

不知道您想分别处理哪种块,让我们假设 您可以从 https://www.mathsisfun.com/阅读所有质数include/primes-to-100k.zip ,分块读取它们,然后在单独的线程上对所有素数进行一些工作.

Having no clue what kind of chunks you want to separately handle, let's assume you read all of the prime numbers from https://www.mathsisfun.com/includes/primes-to-100k.zip, read them in chunks, then do some work on all the primes on separate threads.

这是一些懒惰的主要工作:

Here's some lazy prime job:

void handle_batch(std::vector<size_t> params) {
    if (!params.empty()) {
        std::cout
            << "Batch n:" << params.size()
            << "\tRange [" << params.front() << ".." << params.back() << "]"
            << "\tSum:" << std::accumulate(begin(params), end(params), 0ull)
            << std::endl;
    }
}

是的,我们只打印作业参数及其总和的描述.我们可以在其上做些改动以使其更逼真,例如花一些时间,并意识到我们处于工作线程中,因此我们希望同步对控制台的访问.

Yeah, we just print a description of the job params and their sum. We can doodle a bit on it to make it more lifelike, like making it take some time, and being aware that we are on worker threads, so we want to synchronize access to the console.

void handle_batch(std::vector<size_t> params) {
    std::mutex s_mx;

    if (!params.empty()) {
        // emulate some work, because I'm lazy
        auto sum = std::accumulate(begin(params), end(params), 0ull);
        // then wait some 100..200ms
        {
            using namespace std::chrono_literals;
            std::mt19937 prng(std::random_device{}());
            std::this_thread::sleep_for(
                std::uniform_real_distribution<>(100,200)(prng)*1ms);
        }

        // simple thread id (thread::id displays ugly)
        auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;

        // report results to stdout
        std::lock_guard lk(s_mx); // make sure the output doesn't intermix
        std::cout
            << "Thread #" << std::setw(2) << std::setfill('0') << tid
            << " Batch n:" << params.size()
            << "\tRange [" << params.front() << ".." << params.back() << "]"
            << "\tSum:" << sum
            << std::endl;
    }
}

好的,对于那些无关紧要的东西,这已经足够了.

Okay, that's enough gore for the unimportant bits.

嗯,我选择的方法有些复杂,因为该站点不仅使用https(ugh),而且还提供ZIP文件(ugh).而且我们正在使用C ++(嗯?).

Well, there's a slight complication with my chosen approach, because not only is that site using https (ugh), also it is serving up ZIP files (ugh). And we're using C++ (ugh?).

至少,我们可以用不太多的代码同步完成整个SSL连接业务,但是我们希望读取是异步的,因为那样我们可以证明这一点

At least, we can do the the whole SSL connect business synchronously in not too much code, we want the reading to be asynchronously, though, because that way we can demonstrate that

  • 您可以使用Boost Asio在主线程上进行很多混合的IO
  • Boost Process会作为子进程启动zcat,以解压缩素数内容(我们假设安装了zcat的类UNIX系统)
  • 这意味着我们将异步写入该子进程stdin
  • ,并从其标准输出异步读取
  • 准备就绪后立即产生批处理作业
  • you can do a lot of intermixed IO on just the main thread using Boost Asio
  • same goes for Boost Process to launch zcat as a child process to unzip the primes content (we'll assume UNIX-like system with zcat installed)
  • which means we'll be asynchronously writing to that child process stdin
  • and also asynchronously reading from its stdout
  • spawning off batch jobs along the way as soon as they're ready

对于您的工作负载而言,这应该是一个很好的模型,因为工作人员比IO花费更多的时间,但是,我们在单个线程上完成了许多IO任务而没有阻塞.

This should be pretty good model for your workload, because the workers take more time than the IO, however, we do many IO tasks on a single thread without blocking.

如前所述,我们将为IO使用一个线程,并为批处理程序使用一个线程池:

As said, we will use a single thread for IO, and a thread pool for the batch workers:

int main() {
    net::io_context io; // main thread does all io
    net::thread_pool pool(6); // worker threads

在那里.那是一个开始.现在,我们要建立一个SSL连接,并请求该ZIP.在这里:

There. That's a start. Now, we want to have a SSL connection, and request that ZIP. Here it is:

http::response_parser<http::buffer_body> res_reader;
beast::flat_buffer lookahead; // for the res_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);

{   // synchronously write request
    std::string host = "www.mathsisfun.com";
    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
    http::write(s, get_request(host, "/includes/primes-to-100k.zip"));

    http::read_header(s, lookahead, res_reader);
    //std::cerr << "Headers: " << res_reader.get().base() << std::endl;
}

是的,已经读取了响应头信息¹.当然我们被骗了,因为我们需要三个帮手:

Yup, that already did the reading of the response headers¹. Of course we cheated because we need three helpers:

  1. 创建ssl上下文

  1. making an ssl context

auto ssl_context() {
    ssl::context ctx{ssl::context::sslv23};
    ctx.set_default_verify_paths();
    ctx.set_verify_mode(ssl::verify_peer);
    return ctx;
}

  • 通过SSL连接

  • connecting over SSL

    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
        net::connect(s.lowest_layer(), eps);
        s.lowest_layer().set_option(tcp::no_delay(true));
    
        if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
            throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
        }
        s.handshake(stream::handshake_type::client);
    }
    

  • 发出HTTP请求

  • making the HTTP request

    auto get_request(std::string const& host, std::string const& path) {
        using namespace http;
        request<string_body> req;
        req.version(11);
        req.method(verb::get);
        req.target("https://" + host + path);
        req.set(field::user_agent, "test");
        req.set(field::host, host);
    
        std::cerr << req << std::endl;
        return req;
    }
    

  • 不错,对于C ++.

    现在,我们从异步开始:让我们有一个泵"或循环",它将所有响应数据发送到管道中:

    Now we start with the asynchrony: let's have a "pump" or "loop" that sends all the response data into a pipe:

    // now, asynchoronusly read contents
    process::async_pipe pipe_to_zcat(io);
    
    std::function<void(error_code, size_t)> receive_zip;
    

    receive_zip是我们所说的循环.这是一个自链接的异步操作.因此,每次调用它时,都会将一些数据泵入管道,并为HTTP响应再调用一个async_read:

    receive_zip is what we call our loop. It's a self-chaining asynchronous operation. So, everytime it is called, it will pump some data into the pipe, and call one more async_read for the HTTP response:

    receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
        (error_code ec, size_t /*ignore_this*/)
    {
        auto& res = response_reader.get();
        auto& body = res.body();
        if (body.data) {
            auto n = sizeof(buf) - body.size;
            net::write(pipe_to_zcat, net::buffer(buf, n));
        }
    
        bool done = ec && !(ec == http::error::need_buffer);
        done += response_reader.is_done();
    
        if (done) {
            std::cerr << "receive_zip: " << ec.message() << std::endl;
            pipe_to_zcat.close();
        } else {
            body.data = buf.data();
            body.size = buf.size();
    
            http::async_read(s, lookahead, response_reader, receive_zip);
        }
    };
    

    对缓冲响应的这种稍微复杂的外观读取几乎完全来自现在,我们要做的只是灌注泵:

    Now, all we have to do is prime the pump:

    // kick off receive loop
    receive_zip(error_code{}, 0);
    

    Intermezzo,解压缩

    这不是有趣的部分,让我们开始:我们正在启动子进程zcat,并希望第二个管道读取以下内容的输出:

    Intermezzo, Unzip

    This is not the interesting part, let's go: We are launching a subprocess zcat and want a second pipe to read the output from:

    process::async_pipe zcat_output(io);
    process::child zcat(
       process::search_path("zcat"),
       process::std_in < pipe_to_zcat,
       process::std_out > zcat_output,
       process::on_exit([](int exitcode, std::error_code ec) {
            std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
       }), io);
    

    间歇结束:)

    (我们甚至抛出错误报告,因为为什么不这样做?)

    现在,我们有了另一个异步读取循环,这次是回读未压缩的素数.在这里,我们将组装批处理作业,以在工作人员池中进行处理.

    Now, we have another async read loop, this time to read back the uncompressed primes. This is where we will assemble batch jobs to be handled on the worker pool.

    std::function<void(error_code, size_t)> receive_primes;
    net::streambuf sb;
    

    receive_zip之前一样,receive_primes是我们的循环驱动程序,sb缓冲区只是一个缓冲区,可以像通常从std::cin一样使用std::istream轻松读取.

    Like receive_zip before, receive_primes is our loop driver, the sb buffer is just a buffer which makes it easy to read using std::istream as you'd normally do from std::cin.

    receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
        {
            std::istream is(&sb);
    
            size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
            std::vector<size_t> batch(n);
            std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
            is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant
    
            post(pool, std::bind(handle_batch, std::move(batch)));
        }
    
        if (ec) {
            std::cerr << "receive_primes: " << ec.message() << std::endl;
            zcat_output.close();
        } else {
            net::async_read_until(zcat_output, sb, "\n", receive_primes);
        }
    };
    

    由于async_read_until可能读取部分行,因此我们计算缓冲区中完整行的数量(n)并将其打包为一个向量.确保我们吃了即将到来的换行符之后,我们...批处理作业最终发布:

    Because async_read_until may read partial lines, we count the number (n) of full lines in the buffer and pack them into a vector. After we make sure that we eat the impending newline, we ... post the batch job, finally:

     post(pool, std::bind(handle_batch, std::move(batch)));
    

    我们将所有权转移到任务上,因为它将在单独的线程上运行,并且处理并发的最佳方法是最小化共享.

    We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.

    再次给泵加注:

    // kick off handler loop as well:
    receive_primes(error_code{}, 0);
    

    将所有内容放在一起

    好吧.准备抗高潮.完成所有异步链的设置后,我们要做的就是...等待.

    PUTTING IT ALL TOGETHER

    Well. Prepare for the anticlimax. With all the async chains setup, all we need to do is... wait.

        io.run();
        pool.join();
    } // end of main
    

    io.run()保持两个泵同时运行,并等待子进程,它们都在主线程上,如我们所愿.

    The io.run() keeps running both pumps and awaits the child process, all on the main thread, as we like.

    pool.join()等待所有批处理作业完成,然后再停止线程池.如果省略该行,则可能不会运行所有任务,因为thread_pool的析构函数在调用join()之前先调用stop().

    The pool.join() waits for all batch jobs to be completed, before stopping the thread pool. If you leave out that line, you might not run all the tasks, because the destructor of thread_pool calls stop() before it calls join().

    玩弄缓冲区大小(在我的示例中为512字节),以查看有多大批处理.请注意,512个字节是压缩的字节.

    "UNLIVE"演示

    遗憾的是,据我所知没有在线编译器支持外部网络访问,因此您必须自己运行该编译器.为了方便起见,这是完整的清单,以及在我的计算机上运行的示例输出:

    "UNLIVE" DEMO

    Sadly no online compiler that I know of supports external network access, so you'll have to run this one yourself. For convenience, here's a full listing, and sample output from a run on my computer:

    在Coliru上直播

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/http.hpp>
    #include <boost/process.hpp>
    #include <boost/process/async.hpp>
    #include <iomanip>
    #include <iostream>
    
    void handle_batch(std::vector<size_t> params) {
        std::mutex s_mx;
    
        if (!params.empty()) {
            // emulate some work, because I'm lazy
            auto sum = std::accumulate(begin(params), end(params), 0ull);
            // then wait some 100..200ms
            {
                using namespace std::chrono_literals;
                std::mt19937 prng(std::random_device{}());
                std::this_thread::sleep_for(
                    std::uniform_real_distribution<>(100,200)(prng)*1ms);
            }
    
            // simple thread id (thread::id displays ugly)
            auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
    
            // report results to stdout
            std::lock_guard lk(s_mx); // make sure the output doesn't intermix
            std::cout
                << "Thread #" << std::setw(2) << std::setfill('0') << tid
                << " Batch n:" << params.size()
                << "\tRange [" << params.front() << ".." << params.back() << "]"
                << "\tSum:" << sum
                << std::endl;
        }
    }
    
    namespace net     = boost::asio;
    namespace ssl     = net::ssl;
    namespace beast   = boost::beast;
    namespace http    = beast::http;
    namespace process = boost::process;
    
    using boost::system::error_code;
    using boost::system::system_error;
    using net::ip::tcp;
    using stream = ssl::stream<tcp::socket>;
    
    auto ssl_context() {
        ssl::context ctx{ssl::context::sslv23};
        ctx.set_default_verify_paths();
        ctx.set_verify_mode(ssl::verify_peer);
        return ctx;
    }
    
    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
        net::connect(s.lowest_layer(), eps);
        s.lowest_layer().set_option(tcp::no_delay(true));
    
        if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
            throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
        }
        s.handshake(stream::handshake_type::client);
    }
    
    auto get_request(std::string const& host, std::string const& path) {
        using namespace http;
        request<string_body> req;
        req.version(11);
        req.method(verb::get);
        req.target("https://" + host + path);
        req.set(field::user_agent, "test");
        req.set(field::host, host);
    
        std::cerr << req << std::endl;
        return req;
    }
    
    int main() {
        net::io_context io; // main thread does all io
        net::thread_pool pool(6); // worker threads
    
        // outside for lifetime
        http::response_parser<http::buffer_body> response_reader;
        beast::flat_buffer lookahead; // for the response_reader
        std::array<char,512> buf{0}; // for download content
        auto ctx = ssl_context();
        ssl::stream<tcp::socket> s(io, ctx);
    
        {   // synchronously write request
            std::string host = "www.mathsisfun.com";
            connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
            http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
    
            http::read_header(s, lookahead, response_reader);
            //std::cerr << "Headers: " << response_reader.get().base() << std::endl;
        }
    
        // now, asynchoronusly read contents
        process::async_pipe pipe_to_zcat(io);
    
        std::function<void(error_code, size_t)> receive_zip;
        receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip](error_code ec, size_t /*ignore_this*/) {
            auto& res = response_reader.get();
            auto& body = res.body();
            if (body.data) {
                auto n = sizeof(buf) - body.size;
                net::write(pipe_to_zcat, net::buffer(buf, n));
            }
    
            bool done = ec && !(ec == http::error::need_buffer);
            done += response_reader.is_done();
    
            if (done) {
                std::cerr << "receive_zip: " << ec.message() << std::endl;
                pipe_to_zcat.close();
            } else {
                body.data = buf.data();
                body.size = buf.size();
    
                http::async_read(s, lookahead, response_reader, receive_zip);
            }
        };
    
        // kick off receive loop
        receive_zip(error_code{}, 0);
    
        process::async_pipe zcat_output(io);
        process::child zcat(
           process::search_path("zcat"),
           process::std_in < pipe_to_zcat,
           process::std_out > zcat_output,
           process::on_exit([](int exitcode, std::error_code ec) {
                std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")\n";
           }), io);
    
        std::function<void(error_code, size_t)> receive_primes;
        net::streambuf sb;
        receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
            {
                std::istream is(&sb);
    
                size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '\n');
                std::vector<size_t> batch(n);
                std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
                is.ignore(1, '\n'); // we know a newline is pending, eat it to keep invariant
    
                post(pool, std::bind(handle_batch, std::move(batch)));
            }
    
            if (ec) {
                std::cerr << "receive_primes: " << ec.message() << std::endl;
                zcat_output.close();
            } else {
                net::async_read_until(zcat_output, sb, "\n", receive_primes);
            }
        };
        // kick off handler loop as well:
        receive_primes(error_code{}, 0);
    
        io.run();
        pool.join();
    }
    

    输出:

    GET https://www.mathsisfun.com/includes/primes-to-100k.zip HTTP/1.1
    User-Agent: test
    Host: www.mathsisfun.com
    
    
    receive_zip: Success
    Child process exited with 0 (Success)
    receive_primes: End of file
    Thread #11 Batch n:95   Range [599..1237]   Sum:86587
    Thread #58 Batch n:170  Range [1249..2549]  Sum:320714
    Thread #34 Batch n:170  Range [2551..3919]  Sum:549880
    Thread #54 Batch n:170  Range [3923..5407]  Sum:790922
    Thread #30 Batch n:170  Range [5413..6863]  Sum:1040712
    Thread #60 Batch n:108  Range [2..593]  Sum:28697
    Thread #58 Batch n:170  Range [8429..9923]  Sum:1560462
    Thread #11 Batch n:170  Range [6869..8423]  Sum:1298732
    Thread #30 Batch n:146  Range [12703..14087]    Sum:1956410
    Thread #34 Batch n:147  Range [9929..11329] Sum:1563023
    Thread #54 Batch n:146  Range [11351..12697]    Sum:1758964
    Thread #60 Batch n:146  Range [14107..15473]    Sum:2164462
    Thread #11 Batch n:146  Range [16943..18313]    Sum:2576764
    Thread #34 Batch n:146  Range [19861..21313]    Sum:3003048
    Thread #30 Batch n:146  Range [18329..19853]    Sum:2790654
    Thread #58 Batch n:146  Range [15493..16937]    Sum:2365198
    Thread #60 Batch n:146  Range [22721..24109]    Sum:3422310
    Thread #54 Batch n:146  Range [21317..22717]    Sum:3212180
    Thread #30 Batch n:146  Range [27179..28661]    Sum:4081540
    Thread #11 Batch n:146  Range [24113..25693]    Sum:3640476
    Thread #34 Batch n:146  Range [25703..27143]    Sum:3859484
    Thread #60 Batch n:146  Range [30223..31741]    Sum:4525378
    Thread #54 Batch n:146  Range [31751..33211]    Sum:4746372
    Thread #58 Batch n:146  Range [28663..30211]    Sum:4297314
    Thread #30 Batch n:146  Range [33223..34693]    Sum:4958972
    Thread #34 Batch n:146  Range [36307..37799]    Sum:5408028
    Thread #11 Batch n:146  Range [34703..36299]    Sum:5184000
    Thread #54 Batch n:146  Range [39371..40973]    Sum:5865356
    Thread #60 Batch n:146  Range [37811..39367]    Sum:5637612
    Thread #58 Batch n:146  Range [40993..42433]    Sum:6091022
    Thread #34 Batch n:146  Range [44029..45613]    Sum:6541984
    Thread #54 Batch n:146  Range [47287..48817]    Sum:7013764
    Thread #30 Batch n:146  Range [42437..44027]    Sum:6308156
    Thread #11 Batch n:146  Range [45631..47279]    Sum:6780582
    Thread #58 Batch n:146  Range [50341..51913]    Sum:7470486
    Thread #34 Batch n:146  Range [51929..53569]    Sum:7701048
    Thread #60 Batch n:146  Range [48821..50333]    Sum:7239008
    Thread #54 Batch n:146  Range [53591..55147]    Sum:7934798
    Thread #11 Batch n:146  Range [56713..58211]    Sum:8388956
    Thread #58 Batch n:146  Range [58217..59771]    Sum:8617316
    Thread #30 Batch n:146  Range [55163..56711]    Sum:8169020
    Thread #60 Batch n:146  Range [61519..63197]    Sum:9100594
    Thread #34 Batch n:146  Range [59779..61511]    Sum:8856806
    Thread #54 Batch n:146  Range [63199..64849]    Sum:9339328
    Thread #11 Batch n:146  Range [64853..66457]    Sum:9580694
    Thread #58 Batch n:146  Range [66463..67979]    Sum:9816826
    Thread #30 Batch n:146  Range [67987..69779]    Sum:10057662
    Thread #54 Batch n:146  Range [72931..74573]    Sum:10770902
    Thread #34 Batch n:146  Range [71347..72923]    Sum:10529702
    Thread #60 Batch n:146  Range [69809..71341]    Sum:10304156
    Thread #11 Batch n:146  Range [74587..76231]    Sum:11008056
    Thread #58 Batch n:146  Range [76243..77801]    Sum:11251048
    Thread #30 Batch n:146  Range [77813..79561]    Sum:11491034
    Thread #34 Batch n:146  Range [81119..82729]    Sum:11963076
    Thread #60 Batch n:146  Range [82757..84449]    Sum:12207776
    Thread #58 Batch n:146  Range [86183..87767]    Sum:12700772
    Thread #54 Batch n:146  Range [79579..81101]    Sum:11732042
    Thread #11 Batch n:146  Range [84457..86179]    Sum:12455242
    Thread #30 Batch n:146  Range [87793..89527]    Sum:12951322
    Thread #34 Batch n:146  Range [89533..91153]    Sum:13187046
    Thread #54 Batch n:146  Range [94441..96013]    Sum:13904802
    Thread #30 Batch n:146  Range [97829..99487]    Sum:14403556
    Thread #58 Batch n:146  Range [92779..94439]    Sum:13665032
    Thread #60 Batch n:146  Range [91159..92767]    Sum:13431876
    Thread #11 Batch n:146  Range [96017..97813]    Sum:14148718
    Thread #34 Batch n:46   Range [99497..99991]    Sum:4588078
    


    ¹您可以通过取消注释该行来打印.请注意,Boost 1.70尚未实现流传输,b1.72有一个关于boost :: process :: async_pipe的错误,因此您需要1.73才能真正打印出这样的标题.


    ¹ Which you could print by uncommenting that line. Note that Boost 1.70 doesn't have the streaming implemented, b1.72 has a bug regarding boost::process::async_pipe, so you need 1.73 to actually print the headers like that.

    这篇关于如何使用仅连接一次的多线程从Internet读取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆