在boost :: asio中使用相同的istream和可调整大小的streambuf [英] Using the same istream with a resizeable streambuf in boost::asio

查看:132
本文介绍了在boost :: asio中使用相同的istream和可调整大小的streambuf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用boost asio async_read()从一个套接字读取数据。我有以下持续到程序生命周期的读者类成员:

  boost :: asio :: streambuf recv_data ; 
std :: istream stream_is(& recv_data);

async_read调用如下所示:

  boost :: asio :: async_read(ep_ptr-> get_sock(),recv_data,boost :: asio :: transfer_exactly(n),
boost :: bind IProtocol :: handle_data_recv,& protocol,
boost :: asio :: placeholders :: error));

我的问题是,如果我从插座读取n个字节,的streambuf小于'n',所以它调整自身的大小。我需要重新创建std :: istream,因为std :: istream保存的内部streambuf缓冲区现在可以释放/释放?

解决方案

没有,幸运的是,绑定不关心recv_data的内部可能会重新分配,而是绑定到recv_data对象本身。下面是我写的下载器的工作示例,您可以看到我不在读取之间重新分配缓冲区。



同样,您可以安全地共享引用一个向量,而不关心向量的内部是否被重新分配(除非你直接开始指向向量元素的内存地址,或者在它们失效后使用迭代器)。向量的句柄保持有效,在这个同样的方式,streambuf的句柄仍然对istream有效,它的工作正常)。



download.h

  #ifndef _MV_DOWNLOAD_H_ 
#define _MV_DOWNLOAD_H_

#include< string&
#include< iostream>
#include< istream>
#include< ostream>
#include< fstream>
#include< algorithm>
#includeNetwork / url.h
#includeUtility / generalUtility.h
#include< boost / asio.hpp>
#include< boost / bind.hpp>

命名空间MV {
struct HttpHeader {
std :: string version;
int status = 0;
std :: string message;
std :: map< std :: string,std :: string>值;

std :: vector< std :: string>反弹

bool success = false;
std :: string errorMessage;

size_t contentLength;

HttpHeader(){
}

HttpHeader(std :: istream& response_stream){
read(response_stream);
}

void read(std :: istream& response_stream);
};


inline std :: ostream& operator<<<(std :: ostream& os,const HttpHeader& obj){
os< \\ / ______ HTTP_HEADER ______ \\ / \\\
Version [< obj.version<< ] Status [<< obj.status<< ]消息[< obj.message< ] \\\
;
os<< || ----------------------- || \\\

for(auto&& kvp:obj.values){
os<< [< kvp.first<< ]:< kvp.second<< \\\
;
}
os<< \\\
|| -------- Bounces -------- || \\\
;
for(size_t i = 0; i os< i<< :<< obj.bounces [i]< \\\
;
}
os<< / \\ _______________________ / \\< std :: endl;
return os;
}
inline std :: istream& operator>>(std :: istream& a_is,HttpHeader& a_obj){
a_obj.read(a_is);
return a_is;
}

class DownloadRequest:public std :: enable_shared_from_this< DownloadRequest> {
public:
static std :: shared_ptr< DownloadRequest> make(const MV :: Url& a_url,const std :: shared_ptr< std :: ostream>& a_streamOutput){
auto result = std :: shared_ptr< DownloadRequest>(new DownloadRequest(a_streamOutput));
result-> perform(a_url);
return result;
}

// onComplete在下载结束时调用成功或错误。
static std :: shared_ptr< DownloadRequest> make(const std :: shared_ptr< boost :: asio :: io_service>& a_ioService,const MV :: Url& a_url,const std :: shared_ptr< std :: ostream>& a_streamOutput,std :: function< void std :: shared_ptr< DownloadRequest>)> a_onComplete){
auto result = std :: shared_ptr< DownloadRequest>(new DownloadRequest(a_streamOutput));
result-> onComplete = a_onComplete;
result-> ioService = a_ioService;
result-> perform(a_url);
return result;
}

HttpHeader& header(){
return headerData;
}

MV :: Url finalUrl(){
return currentUrl;
}

MV :: Url inputUrl(){
return originalUrl;
}

private:
DownloadRequest(const std :: shared_ptr< std :: ostream>& a_streamOutput):
streamOutput(a_streamOutput){
}

void perform(const MV :: Url& a_url);

bool initializeSocket();

void initiateRequest(const MV :: Url& a_url);

void handleResolve(const boost :: system :: error_code& err,boost :: asio :: ip :: tcp :: resolver :: iterator endpoint_iterator);
void handleConnect(const boost :: system :: error_code& err,boost :: asio :: ip :: tcp :: resolver :: iterator endpoint_iterator);
void handleWriteRequest(const boost :: system :: error_code& err);
void handleReadHeaders(const boost :: system :: error_code& err);
void handleReadContent(const boost :: system :: error_code& err);

void readResponseToStream(){
(* streamOutput)< &(* response);
}

std :: shared_ptr< boost :: asio :: io_service> ioService;
std :: unique_ptr& boost :: asio :: ip :: tcp :: resolver>解析器
std :: unique_ptr& boost :: asio :: ip :: tcp :: socket>插座;

std :: unique_ptr< std :: istream> responseStream;

std :: unique_ptr< boost :: asio :: streambuf>请求;
std :: unique_ptr< boost :: asio :: streambuf>响应;

std :: shared_ptr< std :: ostream> streamOutput;

HttpHeader headerData;

MV :: Url currentUrl;
MV :: Url originalUrl;

std :: function< void(std :: shared_ptr< DownloadRequest>)> onComplete;
};

std :: string DownloadString(const MV :: Url& a_url);

HttpHeader DownloadFile(const MV :: Url& a_url,const std :: string& a_path);
void DownloadFile(const std :: shared_ptr< boost :: asio :: io_service>& a_ioService,const MV :: Url& a_url,const std :: string& a_path,std :: function< void :: shared_ptr< DownloadRequest>)> a_onComplete = std :: function< void(std :: shared_ptr< DownloadRequest>)>());

void DownloadFiles(const std :: vector< MV :: Url>& a_url,const std :: string& a_path,std :: function< void(std :: shared_ptr< DownloadRequest>) > a_onComplete = std :: function< void(std :: shared_ptr< DownloadRequest>)>());
void DownloadFiles(const std :: shared_ptr< boost :: asio :: io_service>& a_ioService,const std :: vector< MV :: Url>& a_url,const std :: string& a_path,std :: function< void(std :: shared_ptr< DownloadRequest>)> a_onComplete = std :: function< void(std :: shared_ptr< DownloadRequest>)>(),std :: function< void()> a_onAllComplete = std :: function< void()>());
}
#endif

download.cpp

  #includedownload.h
#include< boost / filesystem.hpp>
#include< atomic>

命名空间MV {
void HttpHeader :: read(std :: istream& response_stream){
values.clear();

response_stream>>版;
std :: string status_code;
response_stream>> status_code;
try {
status = std :: stoi(status_code);
} catch(...){
status = 0;
}

getline_platform_agnostic(response_stream,message);

if(!message.empty()&& message [0] ==''){message = message.substr(1); }

std :: string header;
while(getline_platform_agnostic(response_stream,header)&&!header.empty()){
auto index = header.find_first_of(':');
if(index!= std :: string :: npos&& index> 0){
auto key = header.substr(0,index);
auto value =(index + 2> = header.size())? :header.substr(index + 2);
std :: transform(key.begin(),key.end(),key.begin(),[](char c){return std :: tolower(c);});
values [key] = value;
if(toLower(key)==content-length){
try {
contentLength = static_cast< size_t>(stol(value));
} catch(std :: exception& e){
std :: cerr<< e.what()< std :: endl;
contentLength = 0;
}
}
}
}
}

std :: string DownloadString(const Url& a_url){
auto result = std :: make_shared< std :: stringstream>();
if(DownloadRequest :: make(a_url,result) - > header()。success){
return result-> str
} else {
return;
}
}

MV :: HttpHeader DownloadFile(const Url& a_url,const std :: string& a_path){
HttpHeader header;
{
boost :: filesystem :: create_directories(boost :: fileystem :: path(a_path).parent_path());
auto outFile = std :: make_shared< std :: ofstream>(a_path,std :: ofstream :: out | std :: ofstream :: binary);
auto request = DownloadRequest :: make(a_url,outFile);
header = request-> header();
}
if(!header.success){
std :: remove(a_path.c_str());
}
return header;
}

void DownloadFile(const std :: shared_ptr< boost :: asio :: io_service>& a_ioService,const MV :: Url& a_url,const std :: string& a_path ,std :: function< void(std :: shared_ptr< DownloadRequest>)> a_onComplete){
boost :: filesystem :: create_directories(boost :: filesystem :: path(a_path).parent_path());
auto outFile = std :: make_shared< std :: ofstream>(a_path,std :: ofstream :: out | std :: ofstream :: binary);
auto request = DownloadRequest :: make(a_ioService,a_url,outFile,[a_path,a_onComplete](std :: shared_ptr< DownloadRequest> a_result){
if(!a_result-> header ){
std :: remove(a_path.c_str());
}
if(a_onComplete){a_onComplete(a_result);}
}
}

void DownloadFiles(const std :: vector< MV :: Url>& a_urls,const std :: string& a_path,std :: function< void shared_ptr< DownloadRequest>)> a_onComplete){
auto service = std :: make_shared< boost :: asio :: io_service>();
for(auto& url:a_urls){
DownloadFile(service,url,a_path + boost :: filesystem :: path(url.path())。filename()。string a_onComplete);
}
service-> run();
}
void DownloadFiles(const std :: shared_ptr< boost :: asio :: io_service>& a_ioService,const std :: vector< MV :: Url>& a_urls,const std :: string & a_path,std :: function< void(std :: shared_ptr< DownloadRequest>)> a_onComplete,std :: function< void()> a_onAllComplete){
size_t totalFiles = a_urls.size();
for(auto&& url:a_urls){
auto counter = std :: make_shared< std :: atomic< size_t>>(0)
DownloadFile(a_ioService,url,a_path + boost :: filesystem :: path(url.path())。filename()。string(),[=](std :: shared_ptr< DownloadRequest> a_request){
a_onComplete(a_request);
if(++(* counter)== totalFiles){
a_onAllComplete();
}
}
}
}

void DownloadRequest :: handleReadContent(const boost :: system :: error_code& err){
if(!err){
readResponseToStream();
if(onComplete){onComplete(shared_from_this()); }
} else if(err!= boost :: asio :: error :: eof){
headerData.success = false;
headerData.errorMessage =下载读取内容失败:+ err.message();
std :: cerr<< headerData.errorMessage<< std :: endl;
if(onComplete){onComplete(shared_from_this()); }
}
}

void DownloadRequest :: handleReadHeaders(const boost :: system :: error_code& err){
if(!err){
responseStream = std :: make_unique< std :: istream>(&(* response));

headerData.read(* responseStream);
headerData.success = true;
headerData.errorMessage =;
if(headerData.status> = 300&& headerData.status< 400&& headerData.bounces.size()< 32&& headerData.values.find(location )!= headerData.values.end()){
headerData.bounces.push_back(currentUrl.toString());
initiateRequest(headerData.values [location]);
} else {
auto amountLeftToRead = headerData.contentLength - response-> size();
if(response-> size()> 0){
readResponseToStream();
}
if(amountLeftToRead> 0){
boost :: asio :: async_read(* socket,* response,boost :: asio :: transfer_at_least(amountLeftToRead),boost :: bind (& DownloadRequest :: handleReadContent,shared_from_this(),boost :: asio :: placeholders :: error));
} else {
if(onComplete){onComplete(shared_from_this()); }
}
}
} else {
headerData.success = false;
headerData.errorMessage =下载读头失败:+ err.message();
std :: cerr<< headerData.errorMessage<< std :: endl;
if(onComplete){onComplete(shared_from_this()); }
}
}

void DownloadRequest :: handleWriteRequest(const boost :: system :: error_code& err){
if(!err){
boost :: asio :: async_read_until(* socket,* response,\r\\\
\r\\\
,boost :: bind(& DownloadRequest :: handleReadHeaders,shared_from_this(),boost :: asio :: placeholder :: error));
} else {
headerData.success = false;
headerData.errorMessage =下载写入失败:+ err.message();
std :: cerr<< headerData.errorMessage<< std :: endl;
if(onComplete){onComplete(shared_from_this()); }
}
}

void DownloadRequest :: handleConnect(const boost :: system :: error_code& err,boost :: asio :: ip :: tcp :: resolver: :iterator endpoint_iterator){
if(!err){
//连接成功。发送请求。
boost :: asio :: async_write(* socket,* request,boost :: bind(& DownloadRequest :: handleWriteRequest,shared_from_this(),boost :: asio :: placeholders :: error)
} else if(endpoint_iterator!= boost :: asio :: ip :: tcp :: resolver :: iterator()){
//连接失败。尝试列表中的下一个端点。
socket-> close();
boost :: asio :: ip :: tcp :: endpoint endpoint = * endpoint_iterator;
socket-> async_connect(endpoint,boost :: bind(& DownloadRequest :: handleConnect,shared_from_this(),boost :: asio :: placeholders :: error,++ endpoint_iterator)
} else {
headerData.success = false;
headerData.errorMessage =下载连接失败:+ err.message();
std :: cerr<< headerData.errorMessage<< std :: endl;
if(onComplete){onComplete(shared_from_this()); }
}
}

void DownloadRequest :: handleResolve(const boost :: system :: error_code& err,boost :: asio :: ip :: tcp :: resolver: :iterator endpoint_iterator){
if(!err){
//尝试连接到列表中的第一个端点。每个端点
//将被尝试,直到我们成功建立连接。
boost :: asio :: ip :: tcp :: endpoint endpoint = * endpoint_iterator;
socket-> async_connect(endpoint,boost :: bind(& DownloadRequest :: handleConnect,shared_from_this(),boost :: asio :: placeholders :: error,++ endpoint_iterator)
} else {
headerData.success = false;
headerData.errorMessage =下载Resolve Failure:+ err.message();
std :: cerr<< headerData.errorMessage<< std :: endl;
if(onComplete){onComplete(shared_from_this()); }
}
}

void DownloadRequest :: initiateRequest(const MV :: Url& a_url){
socket-> close();
currentUrl = a_url;
request = std :: make_unique< boost :: asio :: streambuf>();
response = std :: make_unique< boost :: asio :: streambuf>();
使用boost :: asio :: ip :: tcp;

std :: ostream requestStream(&(* request));
requestStream<< GET< a_url.pathAndQuery()<< HTTP / 1.1\r\\\
;
requestStream<< Host:<< a_url.host()<< \r\\\
;
requestStream<< Accept:* / * \r\\\
;
requestStream<< Connection:close\r\\\
\r\\\
;

tcp :: resolver :: query query(a_url.host(),http);
resolver-> async_resolve(query,boost :: bind(& DownloadRequest :: handleResolve,shared_from_this(),boost :: asio :: placeholders :: error,boost :: asio :: placeholders :: iterator) );
}

bool下载Request :: initializeSocket(){
bool created = false;
if(!ioService){
ioService = std :: make_shared& boost :: asio :: io_service>();
created = true;
}

resolver = std :: make_unique< boost :: asio :: ip :: tcp :: resolver>(* ioService);
socket = std :: make_unique< boost :: asio :: ip :: tcp :: socket>(* ioService);

return created;
}

void DownloadRequest :: perform(const MV :: Url& a_url){
originalUrl = a_url;
try {
bool needToCallRun = initializeSocket();
initiateRequest(a_url);
if(needToCallRun){
ioService-> run();
}
} catch(...){
headerData.success = false;
headerData.errorMessage =异常抛出到顶层。
std :: cerr<< headerData.errorMessage<< std :: endl;
onComplete(shared_from_this());
}
}

}

> generalUtility.h (它是其中的一部分,仅供参考)

  inline std :: istream& getline_platform_agnostic(std :: istream& is,std :: string& t){
t.clear();

//使用std :: streambuf一个一个地读取流中的字符。
//这比使用std :: istream逐个读取它们要快。
//以这种方式使用streambuf的代码必须由sentry对象保护。
// sentry对象执行各种任务,
//如线程同步和更新流状态。

std :: istream :: sentry se(is,true);
std :: streambuf * sb = is.rdbuf();

for(;;){
int c = sb-> sbumpc();
switch(c){
case'\\\
':
return is;
case'\r':
if(sb-> sgetc()=='\\\
')
sb-> sbumpc();
return is;
case EOF:
//也处理最后一行没有行结束的情况
if(t.empty())
is.setstate(std :: ios: :eofbit);
return is;
默认值:
t + =(char)c;
}
}
}

inline std :: string toLower(std :: string s){
std :: transform(s.begin ),s.end(),s.begin(),[](char c){return std :: tolower(c);}
return s;
}

url.h
$ b

修改(稍微改变一些命名方案)



https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/ Poco / URI.h



https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp


I am reading data off a socket using boost asio async_read(). I have the following members of the reader class which persist through the lifetime of the program:

boost::asio::streambuf recv_data;
std::istream stream_is(&recv_data);

The async_read call looks like this:

boost::asio::async_read(ep_ptr->get_sock(), recv_data, boost::asio::transfer_exactly(n),
                              boost::bind(&IProtocol::handle_data_recv, &protocol,
                                          boost::asio::placeholders::error));

My question is, what would happen if I am reading 'n' bytes off the socket and the size of the streambuf is less than 'n' so it resizes itself. Do I need to re-create the std::istream since the internal streambuf buffer that the std::istream is holding might now be freed/deallocated?

解决方案

No, thankfully, the binding doesn't care that recv_data's internals may get reallocated and it instead binds to the recv_data object itself. Here is a working example of a downloader I wrote which you can see I do not re-allocate the buffer in between reads.

In the same way you can safely share a reference to a vector and not care if the internals of the vector are re-allocated (unless you start pointing at memory addresses of the vector's elements directly, or using iterators after they become invalidated. The handle to the vector remains valid, and in this same way, the handle to the streambuf remains valid to the istream and it works just fine).

download.h

#ifndef _MV_DOWNLOAD_H_
#define _MV_DOWNLOAD_H_

#include <string>
#include <iostream>
#include <istream>
#include <ostream>
#include <fstream>
#include <algorithm>
#include "Network/url.h"
#include "Utility/generalUtility.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>

namespace MV {
    struct HttpHeader {
        std::string version;
        int status = 0;
        std::string message;
        std::map<std::string, std::string> values;

        std::vector<std::string> bounces;

        bool success = false;
        std::string errorMessage;

        size_t contentLength;

        HttpHeader() {
        }

        HttpHeader(std::istream& response_stream) {
            read(response_stream);
        }

        void read(std::istream& response_stream);
    };


    inline std::ostream& operator<<(std::ostream& os, const HttpHeader& obj) {
        os << "\\/______HTTP_HEADER______\\/\nVersion [" << obj.version << "] Status [" << obj.status << "] Message [" << obj.message << "]\n";
        os << "||-----------------------||\n";
        for (auto&& kvp : obj.values) {
            os << "[" << kvp.first << "]: " << kvp.second << "\n";
        }
        os << "\n||--------Bounces--------||\n";
        for (size_t i = 0; i < obj.bounces.size(); ++i) {
            os << i << ": " << obj.bounces[i] << "\n";
        }
        os << "/\\_______________________/\\" << std::endl;
        return os;
    }
    inline std::istream& operator>>(std::istream& a_is, HttpHeader& a_obj) {
        a_obj.read(a_is);
        return a_is;
    }

    class DownloadRequest : public std::enable_shared_from_this<DownloadRequest> {
    public:
        static std::shared_ptr<DownloadRequest> make(const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput) {
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->perform(a_url);
            return result;
        }

        //onComplete is called on success or error at the end of the download.
        static std::shared_ptr<DownloadRequest> make(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput, std::function<void (std::shared_ptr<DownloadRequest>)> a_onComplete) {
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->onComplete = a_onComplete;
            result->ioService = a_ioService;
            result->perform(a_url);
            return result;
        }

        HttpHeader& header() {
            return headerData;
        }

        MV::Url finalUrl() {
            return currentUrl;
        }

        MV::Url inputUrl() {
            return originalUrl;
        }

    private:
        DownloadRequest(const std::shared_ptr<std::ostream> &a_streamOutput) :
            streamOutput(a_streamOutput) {
        }

        void perform(const MV::Url& a_url);

        bool initializeSocket();

        void initiateRequest(const MV::Url& a_url);

        void handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleWriteRequest(const boost::system::error_code& err);
        void handleReadHeaders(const boost::system::error_code& err);
        void handleReadContent(const boost::system::error_code& err);

        void readResponseToStream() {
            (*streamOutput) << &(*response);
        }

        std::shared_ptr<boost::asio::io_service> ioService;
        std::unique_ptr<boost::asio::ip::tcp::resolver> resolver;
        std::unique_ptr<boost::asio::ip::tcp::socket> socket;

        std::unique_ptr<std::istream> responseStream;

        std::unique_ptr<boost::asio::streambuf> request;
        std::unique_ptr<boost::asio::streambuf> response;

        std::shared_ptr<std::ostream> streamOutput;

        HttpHeader headerData;

        MV::Url currentUrl;
        MV::Url originalUrl;

        std::function<void(std::shared_ptr<DownloadRequest>)> onComplete;
    };

    std::string DownloadString(const MV::Url& a_url);

    HttpHeader DownloadFile(const MV::Url& a_url, const std::string &a_path);
    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());

    void DownloadFiles(const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>(), std::function<void()> a_onAllComplete = std::function<void()>());
}
#endif

download.cpp

#include "download.h"
#include <boost/filesystem.hpp>
#include <atomic>

namespace MV{
    void HttpHeader::read(std::istream& response_stream) {
        values.clear();

        response_stream >> version;
        std::string status_code;
        response_stream >> status_code;
        try {
            status = std::stoi(status_code);
        } catch (...) {
            status = 0;
        }

        getline_platform_agnostic(response_stream, message);

        if (!message.empty() && message[0] == ' ') { message = message.substr(1); }

        std::string header;
        while (getline_platform_agnostic(response_stream, header) && !header.empty()) {
            auto index = header.find_first_of(':');
            if (index != std::string::npos && index > 0) {
                auto key = header.substr(0, index);
                auto value = (index + 2 >= header.size()) ? "" : header.substr(index + 2);
                std::transform(key.begin(), key.end(), key.begin(), [](char c) {return std::tolower(c); });
                values[key] = value;
                if (toLower(key) == "content-length") {
                    try {
                        contentLength = static_cast<size_t>(stol(value));
                    } catch (std::exception &e) {
                        std::cerr << e.what() << std::endl;
                        contentLength = 0;
                    }
                }
            }
        }
    }

    std::string DownloadString(const Url& a_url) {
        auto result = std::make_shared<std::stringstream>();
        if (DownloadRequest::make(a_url, result)->header().success) {
            return result->str();
        } else {
            return "";
        }
    }

    MV::HttpHeader DownloadFile(const Url& a_url, const std::string &a_path) {
        HttpHeader header;
        {
            boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
            auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
            auto request = DownloadRequest::make(a_url, outFile);
            header = request->header();
        }
        if (!header.success) {
            std::remove(a_path.c_str());
        }
        return header;
    }

    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) {
        boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
        auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
        auto request = DownloadRequest::make(a_ioService, a_url, outFile, [a_path, a_onComplete](std::shared_ptr<DownloadRequest> a_result) {
            if (!a_result->header().success) {
                std::remove(a_path.c_str());
            }
            if (a_onComplete) { a_onComplete(a_result); }
        });
    }

    void DownloadFiles(const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) {
        auto service = std::make_shared<boost::asio::io_service>();
        for (auto&& url : a_urls) {
            DownloadFile(service, url, a_path + boost::filesystem::path(url.path()).filename().string(), a_onComplete);
        }
        service->run();
    }
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete, std::function<void()> a_onAllComplete) {
        size_t totalFiles = a_urls.size();
        for (auto&& url : a_urls) {
            auto counter = std::make_shared<std::atomic<size_t>>(0);
            DownloadFile(a_ioService, url, a_path + boost::filesystem::path(url.path()).filename().string(), [=](std::shared_ptr<DownloadRequest> a_request) {
                a_onComplete(a_request);
                if (++(*counter) == totalFiles) {
                    a_onAllComplete();
                }
            });
        }
    }

    void DownloadRequest::handleReadContent(const boost::system::error_code& err) {
        if (!err) {
            readResponseToStream();
            if (onComplete) { onComplete(shared_from_this()); }
        } else if (err != boost::asio::error::eof) {
            headerData.success = false;
            headerData.errorMessage = "Download Read Content Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleReadHeaders(const boost::system::error_code& err) {
        if (!err) {
            responseStream = std::make_unique<std::istream>(&(*response));

            headerData.read(*responseStream);
            headerData.success = true;
            headerData.errorMessage = "";
            if (headerData.status >= 300 && headerData.status < 400 && headerData.bounces.size() < 32 && headerData.values.find("location") != headerData.values.end()) {
                headerData.bounces.push_back(currentUrl.toString());
                initiateRequest(headerData.values["location"]);
            } else {
                auto amountLeftToRead = headerData.contentLength - response->size();
                if (response->size() > 0) {
                    readResponseToStream();
                }
                if (amountLeftToRead > 0) {
                    boost::asio::async_read(*socket, *response, boost::asio::transfer_at_least(amountLeftToRead), boost::bind(&DownloadRequest::handleReadContent, shared_from_this(), boost::asio::placeholders::error));
                } else {
                    if (onComplete) { onComplete(shared_from_this()); }
                }
            }
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Read Header Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleWriteRequest(const boost::system::error_code& err) {
        if (!err) {
            boost::asio::async_read_until(*socket, *response, "\r\n\r\n", boost::bind(&DownloadRequest::handleReadHeaders, shared_from_this(), boost::asio::placeholders::error));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Write Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
        if (!err) {
            // The connection was successful. Send the request.
            boost::asio::async_write(*socket, *request, boost::bind(&DownloadRequest::handleWriteRequest, shared_from_this(), boost::asio::placeholders::error));
        } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
            // The connection failed. Try the next endpoint in the list.
            socket->close();
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Connection Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
        if (!err) {
            // Attempt a connection to the first endpoint in the list. Each endpoint
            // will be tried until we successfully establish a connection.
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Resolve Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::initiateRequest(const MV::Url& a_url) {
        socket->close();
        currentUrl = a_url;
        request = std::make_unique<boost::asio::streambuf>();
        response = std::make_unique<boost::asio::streambuf>();
        using boost::asio::ip::tcp;

        std::ostream requestStream(&(*request));
        requestStream << "GET " << a_url.pathAndQuery() << " HTTP/1.1\r\n";
        requestStream << "Host: " << a_url.host() << "\r\n";
        requestStream << "Accept: */*\r\n";
        requestStream << "Connection: close\r\n\r\n";

        tcp::resolver::query query(a_url.host(), "http");
        resolver->async_resolve(query, boost::bind(&DownloadRequest::handleResolve, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator));
    }

    bool DownloadRequest::initializeSocket() {
        bool created = false;
        if (!ioService) {
            ioService = std::make_shared<boost::asio::io_service>();
            created = true;
        }

        resolver = std::make_unique<boost::asio::ip::tcp::resolver>(*ioService);
        socket = std::make_unique<boost::asio::ip::tcp::socket>(*ioService);

        return created;
    }

    void DownloadRequest::perform(const MV::Url& a_url) {
        originalUrl = a_url;
        try {
            bool needToCallRun = initializeSocket();
            initiateRequest(a_url);
            if (needToCallRun) {
                ioService->run();
            }
        } catch (...) {
            headerData.success = false;
            headerData.errorMessage = "Exception thrown to top level.";
            std::cerr << headerData.errorMessage << std::endl;
            onComplete(shared_from_this());
        }
    }

}

generalUtility.h (part of it anyway, just for this reference)

inline std::istream& getline_platform_agnostic(std::istream& is, std::string& t) {
    t.clear();

    // The characters in the stream are read one-by-one using a std::streambuf.
    // That is faster than reading them one-by-one using the std::istream.
    // Code that uses streambuf this way must be guarded by a sentry object.
    // The sentry object performs various tasks,
    // such as thread synchronization and updating the stream state.

    std::istream::sentry se(is, true);
    std::streambuf* sb = is.rdbuf();

    for (;;) {
        int c = sb->sbumpc();
        switch (c) {
        case '\n':
            return is;
        case '\r':
            if (sb->sgetc() == '\n')
                sb->sbumpc();
            return is;
        case EOF:
            // Also handle the case when the last line has no line ending
            if (t.empty())
                is.setstate(std::ios::eofbit);
            return is;
        default:
            t += (char)c;
        }
    }
}

inline std::string toLower(std::string s) {
    std::transform(s.begin(), s.end(), s.begin(), [](char c) { return std::tolower(c); });
    return s;
}

url.h

Modified (slightly, just changed some naming scheme stuff)

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/Poco/URI.h

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp

这篇关于在boost :: asio中使用相同的istream和可调整大小的streambuf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆