我如何使用从on_read处理程序分派的最终回调将响应异步返回给调用者? [英] how do i return the response back to caller asynchronously using a final callback dispatched from on_read handler?

查看:91
本文介绍了我如何使用从on_read处理程序分派的最终回调将响应异步返回给调用者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要为c ++客户端公开一个异步REST api,该API在内部使用boost :: beast发送REST请求/接收响应.

起点是http_client_async.cpp示例.

现在,客户端将使用此异步api传递回调函数,该函数需要在REST操作结束时从on_read()处理函数[http_client_async.cpp]中调用,并将完整的响应传递回调用者./p>

我该如何实现?

解决方案

但是有什么方法可以通过asio的io_context调用_callback吗?我想以异步方式调用此回调,因为用户提供的此回调可能会阻塞,从而也阻塞io_context的线程?类似于在io_context中调度其他处理程序(如on_read(),on_write()等)的方式?

是的.您所追求的是async_result协议.我在其他答案中也有一些例子(例如如何我可以从boost :: asio :: post获得未来吗?).

以下是构建基块:

存储处理程序

在您的会话"中(让我们将其重命名为 http_request_op 并将其隐藏在一些详细的名称空间中),您想记住一个完成处理程序.

不用担心,没有人提出这样的处理程序.我们将添加一个初始化函数 async_http_request

最终用户可能会使用future或协程(yield_context).当然,如果愿意,他们可以提供普通的回调.

 使用Response = http :: response< http :: string_body> ;;模板< typename Handler>类http_request_op:公共std :: enable_shared_from_this< http_request_op< Handler>>{//...响应res_;处理程序handler_;//...上市:模板< typename Executor>明确的http_request_op(执行程序ex,处理程序处理程序):resolver_(ex),stream_(ex),handler_(std :: move(handler)){} 

现在,在最后一步中,您将调用该 handler _ .为简单起见,我将 fail 帮助器变成了一个成员函数,并将其命名为 complete :

 无效完成(beast :: error_code ec,char const *什么){如果(ec&& what){//TODO:一个更好的主意是创建一个自定义的`Response`类型,//为失败阶段"留出空间res_.reason(what);}post(stream_.get_executor(),[this,ec,self = this-> shared_from_this()] {handler_(ec,std :: move(res_));});} 

所有检查 ec 并使用 fail 的场所现在都使用相同的 ec 调用 complete .另外,在 on_read 中,我们添加了无条件完成:

  void on_read(beast :: error_code ec,size_t/* bytes_transferred */){如果(ec)返回complete(ec,"read");stream_.socket().shutdown(tcp :: socket :: shutdown_both,ec);//无条件地在这里完成返回complete(ec,"shutdown");} 

启动功能( async_http_request )

 模板< typename上下文,typename令牌>自动async_http_request(上下文和ctx,beast :: string_view主机,beast :: string_view端口,beast :: string_view目标,int版本,令牌和令牌){使用result_type = typename net :: async_result< std :: decay_t< Token> ;, void(beast :: error_code,Response)>使用handler_type =类型名result_type :: completion_handler_type;handler_type handler(std :: forward< Token>(令牌));result_type result(handler);std :: make_shared< detail :: http_request_op< handler_type>>(make_strand(ctx),std :: move(handler))->开始(主机,端口,目标,版本);返回result.get();} 

您会看到这将创建一个异步结果,该结果将通过传递的令牌, http_request_op 的脚踢来制作处理程序",并返回异步结果.

返回的内容取决于传递的令牌.查看用法:

用法

我将展示最终用户选择使用此 async_http_request 初始化函数的各种方式:

使用未来

  auto future = async_http_request(ioc.get_executor(),主机,端口,目标,版本,net :: use_future);ioc.run();std :: cout<<future.get()<<"\ n"; 

返回类型为 std :: future< Response> .

Asio神奇地处理了诺言的创建和返回值/异常信息的设置.

使用协程/产量环境:

  net :: spawn(ioc,[& ioc,args](net :: yield_context yield){尝试 {自动主机= args [0];自动端口= args [1];自动目标= args [2];int版本= args [3] =="1.0"?10:11;响应res = async_http_request(国际奥委会主机,端口,目标,版本,屈服);std :: cout<<res<<std :: endl;} catch(boost :: system :: system_error const& se){//在这里无法获得回应std :: cout<<出现了错误:"<<se.code().message()<<std :: endl;}});ioc.run(); 

此处的返回类型仅为 Response .请注意,如果报告了错误情况,则会引发异常.或者,传递一个error_code变量:

  beast :: error_code ec;响应res = async_http_request(国际奥委会主机,端口,目标,版本,yield [ec]);std :: cout<<ec.message()<<"\ n"<<res<<std :: endl; 

仍然使用回调

 /* void */async_http_request(IOC,主机,端口,目标,版本,[](beast :: error_code ec,响应const& res){std :: cout<<ec.message()<<"\ b"<<res<<"\ n";}); 

返回值最终只是 void .

完整演示代码

没有实时演示,因为没有在线编译器支持网络请求,并且它也超出了编译限制(例如,此处)

  #include< boost/asio.hpp>#include< boost/asio/spawn.hpp>#include< boost/asio/use_future.hpp>#include< boost/beast/core.hpp>#include< boost/beast/http.hpp>#include< boost/beast/version.hpp>#include< iostream>#include< memory>名称空间beast = boost :: beast;命名空间http = beast :: http;名称空间net = boost :: asio;使用tcp = boost :: asio :: ip :: tcp;使用Response = http :: response< http :: string_body> ;;命名空间详细信息{模板< typename Handler>类http_request_op:公共std :: enable_shared_from_this< http_request_op< Handler>>{tcp :: resolver resolver_;野兽:: tcp_stream stream_;野兽:: flat_buffer buffer_;http :: request< http :: empty_body>req_;响应res_;处理程序handler_;模板< typename F>自动绑定(F ptmf){返回beast :: bind_front_handler(ptmf,this-> shared_from_this());}无效完整的(beast :: error_code ec,char const * what){如果(ec&& what){//TODO:一个更好的主意是创建一个自定义的`Response`类型,//为失败阶段"留出空间res_.reason(what);}post(stream_.get_executor(),[this,ec,self = this-> shared_from_this()] {handler_(ec,std :: move(res_));});}上市:模板< typename Executor>明确的http_request_op(执行程序ex,处理程序处理程序):resolver_(ex),stream_(ex),handler_(std :: move(handler)){}void start(beast :: string_view主机,beast :: string_view端口,beast :: string_view目标,整数版本){req_.version(version);req_.method(http :: verb :: get);req_.target(target);req_.set(http :: field :: host,host);req_.set(http :: field :: user_agent,BOOST_BEAST_VERSION_STRING);resolver_.async_resolve(host.to_string(),port.to_string(),bind_executor(stream_.get_executor(),bind(& http_request_op :: on_resolve))));}私人的:void on_resolve(beast :: error_code ec,tcp :: resolver :: results_type结果){如果(ec)返回complete(ec,"resolve");stream_.expires_after(std :: chrono :: seconds(30));stream_.async_connect(结果,绑定(& http_request_op :: on_connect));}void on_connect(beast :: error_code const& ec,tcp :: endpoint const&){如果(ec)返回complete(ec,"connect");stream_.expires_after(std :: chrono :: seconds(30));http :: async_write(stream_,req_,bind(& http_request_op :: on_write));}无效on_read(beast :: error_code ec,size_t/* bytes_transferred */){如果(ec)返回complete(ec,"read");stream_.socket().shutdown(tcp :: socket :: shutdown_both,ec);//无条件地在这里完成返回complete(ec,"shutdown");}无效on_write(beast :: error_code ec,size_t/* bytes_transferred */){如果(ec)返回complete(ec,"write");http :: async_read(stream_,buffer_,res_,bind(& http_request_op :: on_read));}};}模板< typename上下文,typename令牌>自动async_http_request(上下文和ctx,beast :: string_view主机,beast :: string_view端口,beast :: string_view目标,int版本,令牌和令牌){使用result_type = typename net :: async_result< std :: decay_t< Token> ;, void(beast :: error_code,Response)>使用handler_type =类型名result_type :: completion_handler_type;handler_type handler(std :: forward< Token>(令牌));result_type result(handler);std :: make_shared< detail :: http_request_op< handler_type>>(make_strand(ctx),std :: move(handler))->开始(主机,端口,目标,版本);返回result.get();}int main(int argc,char ** argv){std :: vector< beast :: string_view>args {argv + 1,argv + argc};如果(args.size()== 3)args.push_back("1.1");如果(args.size()!= 4){std :: cerr<<用法:http-client-async< host>< port>< target> [< HTTP版本:1.0或1.1(默认)>] \ n"<<示例:\ n"<<"http-client-async www.example.com 80/\ n"<<"http-client-async www.example.com 80/1.0 \ n";返回255;}自动主机= args [0];自动端口= args [1];自动目标= args [2];int版本= args [3] =="1.0"?10:11;net :: io_context ioc;net :: spawn(ioc,[=,& ioc](net :: yield_context yield){尝试 {响应res = async_http_request(国际奥委会主机,端口,目标,版本,屈服);std :: cout<<来自科罗(尝试/捕获):<<res.reason()<<std :: endl;} catch(boost :: system :: system_error const& se){//在这里无法获得回应std :: cout<<异常:"<<se.code().message()<<std :: endl;}});net :: spawn(ioc,[=,& ioc](net :: yield_context yield){野兽:: error_code ec;响应res = async_http_request(国际奥委会主机,端口,目标,版本,yield [ec]);std :: cout<<来自科罗:"<<ec.message()<<",<<res.reason()<<"\ n";});/* void */async_http_request(IOC,主机,端口,目标,版本,[](beast :: error_code ec,Response const& res){std :: cout<<来自回调:"<<ec.message()<<",<<res.reason()<<"\ n";});自动未来= async_http_request(IOC,主机,端口,目标,版本,net :: use_future);ioc.run();尝试 {std :: cout<<从将来开始:"<<future.get().reason()<<"\ n";} catch(boost :: system :: system_error const& se){std :: cout<<未来的例外:"<<se.code().message()<<std :: endl;}} 

成功和失败请求的输出:

  $ ./sotest www.example.com 80/1.1来自回调:成功,确定来自冠:成功,确定从coro(尝试/捕获):确定从未来:好的$ ./sotest www.example.com 81/1.1来自回调:套接字由于超时而关闭,请连接coro异常:由于超时,套接字已关闭来自coro:套接字因超时而关闭,请连接来自将来:将来的异常:由于超时,套接字已关闭$ ./sotest www.example.cough 80/1.1从回调中:找不到主机(权威),解决coro例外:找不到主机(权威)来自coro:找不到主机(权威),请解决来自将来:未来例外:找不到主机(权威)$ ./sotest www.example.com大黄/1.1从回调:未找到服务,解决coro例外:找不到服务来自coro:未找到服务,请解决来自未来:未来异常:未找到服务 

请注意,超时示例的运行时间总计约为30秒,因为所有操作都是异步运行的.

I need to expose an async REST api for c++ clients, that internally uses boost::beast for sending REST requests / receiving responses.

The starting point is http_client_async.cpp example.

Now the client will pass a callback function using this async api, that needs to be called at the end of the REST operation from the on_read() handler[http_client_async.cpp], passing the full response back to the caller.

How can i achieve this?

解决方案

but is there any way to invoke thie _callback through asio's io_context? I would like to call this callback in async fashion since this callback , which is provided by the user could block, and thus block the io_context's thread aswell? Similar to the way the other handlers like the on_read(), on_write() etc are scheduled in the io_context?

Yes. What you're after is the async_result protocol. I have some examples of that in other answers (e.g. How can I get a future from boost::asio::post?).

Here's the building blocks:

Store a handler

In your "session" (let's rename it http_request_op and hide it in some detail namespace), you want to remember a completion handler.

Don't worry, nobody has to come up with the such a handler. We will add an initiaing function async_http_request that will make it for you.

The end-user might use a future or a coroutine (yield_context). Of course, they can supply a plain vanilla callback if they prefer.

using Response = http::response<http::string_body>;

template <typename Handler>
class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
    // ...
    Response res_;
    Handler handler_;

    // ...
  public:

    template <typename Executor>
    explicit http_request_op(Executor ex, Handler handler)
        : resolver_(ex),
        stream_(ex),
        handler_(std::move(handler))
    { }

Now in your final step you invoke that handler_. To keep it simple I made the fail helper into a member function and called it complete:

void complete(beast::error_code ec, char const* what) {
    if (ec && what) {
        // TODO: A better idea would to make a custom `Response` type that
        // has room for "fail stage"
        res_.reason(what);
    }
    post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
            handler_(ec, std::move(res_));
        });
}

All the places that check ec and used fail before now call complete with the same ec. In addition, in on_read we add an unconditional completion:

void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
    if (ec)
        return complete(ec, "read");
    stream_.socket().shutdown(tcp::socket::shutdown_both, ec);

    // unconditional complete here
    return complete(ec, "shutdown");
}

Initiating function (async_http_request)

template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
    using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
    using handler_type = typename result_type::completion_handler_type;
    handler_type handler(std::forward<Token>(token));
    result_type result(handler);

    std::make_shared<detail::http_request_op<handler_type> >
        (make_strand(ctx), std::move(handler))
            ->start(host, port, target, version);

    return result.get();
}

You see this creates an async result, which crafts a "handler" from the token passed, kicks of the http_request_op and returns the async result.

What is returned depends on what token is passed. See the usages:

Usage

I'll show various ways in which end-users can choose to use this async_http_request initiating function:

Using a future

auto future = async_http_request(ioc.get_executor(), host, port, target, version, net::use_future);
ioc.run();

std::cout << future.get() << "\n";

The return type is std::future<Response>.

The creation of the promise and setting the return value/exception information is magically handled by Asio.

Using a coroutine/yield context:

net::spawn(ioc, [&ioc,args](net::yield_context yield) {
    try {
        auto host   = args[0];
        auto port   = args[1];
        auto target = args[2];
        int version = args[3]=="1.0"? 10 : 11;

        Response res = async_http_request(
                ioc,
                host, port, target, version,
                yield);

        std::cout << res << std::endl;
    } catch (boost::system::system_error const& se) {
        // no way to get at response here
        std::cout << "There was an error: " << se.code().message() << std::endl;
    }
});

ioc.run();

The return type is just Response here. Note that exceptions are raised if an error condition is reported. Alternatively, pass an error_code variable:

        beast::error_code ec;
        Response res = async_http_request(
                ioc,
                host, port, target, version,
                yield[ec]);

        std::cout << ec.message() << "\n" << res << std::endl;

Still using a callback

/*void*/ async_http_request(ioc, host, port, target, version, 
    [](beast::error_code ec, Response const& res) {
        std::cout << ec.message() << "\b" << res << "\n";
    });

The return value ends up being simply void.

Full Demo Code

No live demo because no online compiler supports network requests and also it exceeds compilation limits (e.g. here)

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <iostream>
#include <memory>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

using Response = http::response<http::string_body>;

namespace detail {
    template <typename Handler>
    class http_request_op : public std::enable_shared_from_this<http_request_op<Handler> > {
        tcp::resolver resolver_;
        beast::tcp_stream stream_;
        beast::flat_buffer buffer_;
        http::request<http::empty_body> req_;
        Response res_;
        Handler handler_;

        template <typename F>
        auto bind(F ptmf) { return beast::bind_front_handler(ptmf, this->shared_from_this()); }

        void complete(beast::error_code ec, char const* what) {
            if (ec && what) {
                // TODO: A better idea would to make a custom `Response` type that
                // has room for "fail stage"
                res_.reason(what);
            }
            post(stream_.get_executor(), [this, ec, self=this->shared_from_this()] {
                    handler_(ec, std::move(res_));
                });
        }
      public:
        template <typename Executor>
        explicit http_request_op(Executor ex, Handler handler)
          : resolver_(ex),
            stream_(ex),
            handler_(std::move(handler))
        { }

        void start(beast::string_view host, beast::string_view port, beast::string_view target, int version) {
            req_.version(version);
            req_.method(http::verb::get);
            req_.target(target);
            req_.set(http::field::host, host);
            req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
            resolver_.async_resolve(host.to_string(), port.to_string(), 
                bind_executor(stream_.get_executor(), bind(&http_request_op::on_resolve)));
        }

      private:
        void on_resolve(beast::error_code ec, tcp::resolver::results_type results) {
            if (ec)
                return complete(ec, "resolve");
            stream_.expires_after(std::chrono::seconds(30));
            stream_.async_connect(results, bind(&http_request_op::on_connect));
        }

        void on_connect(beast::error_code const& ec, tcp::endpoint const&) {
            if (ec)
                return complete(ec, "connect");
            stream_.expires_after(std::chrono::seconds(30));
            http::async_write(stream_, req_, bind(&http_request_op::on_write));
        }

        void on_read(beast::error_code ec, size_t /*bytes_transferred*/) {
            if (ec)
                return complete(ec, "read");
            stream_.socket().shutdown(tcp::socket::shutdown_both, ec);

            // unconditional complete here
            return complete(ec, "shutdown");
        }

        void on_write(beast::error_code ec, size_t /*bytes_transferred*/) {
            if (ec)
                return complete(ec, "write");
            http::async_read(stream_, buffer_, res_, bind(&http_request_op::on_read));
        }
    };
}

template <typename Context, typename Token>
auto async_http_request(Context& ctx, beast::string_view host, beast::string_view port, beast::string_view target, int version, Token&& token) {
    using result_type = typename net::async_result<std::decay_t<Token>, void(beast::error_code, Response)>;
    using handler_type = typename result_type::completion_handler_type;
    handler_type handler(std::forward<Token>(token));
    result_type result(handler);

    std::make_shared<detail::http_request_op<handler_type> >
        (make_strand(ctx), std::move(handler))
            ->start(host, port, target, version);

    return result.get();
}

int main(int argc, char** argv) {
    std::vector<beast::string_view> args{argv+1, argv+argc};
    if (args.size() == 3) args.push_back("1.1");

    if (args.size() != 4) {
        std::cerr << "Usage: http-client-async <host> <port> <target> [<HTTP "
                     "version: 1.0 or 1.1(default)>]\n"
                  << "Example:\n"
                  << "    http-client-async www.example.com 80 /\n"
                  << "    http-client-async www.example.com 80 / 1.0\n";
        return 255;
    }

    auto host   = args[0];
    auto port   = args[1];
    auto target = args[2];
    int version = args[3]=="1.0"? 10 : 11;

    net::io_context ioc;

    net::spawn(ioc, [=,&ioc](net::yield_context yield) {
        try {
            Response res = async_http_request(
                    ioc,
                    host, port, target, version,
                    yield);

            std::cout << "From coro (try/catch): " << res.reason() << std::endl;
        } catch (boost::system::system_error const& se) {
            // no way to get at response here
            std::cout << "coro exception: " << se.code().message() << std::endl;
        }
    });

    net::spawn(ioc, [=,&ioc](net::yield_context yield) {
        beast::error_code ec;
        Response res = async_http_request(
                ioc,
                host, port, target, version,
                yield[ec]);

        std::cout << "From coro: " << ec.message() << ", " << res.reason() << "\n";
    });

    /*void*/ async_http_request(ioc, host, port, target, version, 
        [](beast::error_code ec, Response const& res) {
            std::cout << "From callback: " << ec.message() << ", " << res.reason() << "\n";
        });

    auto future = async_http_request(ioc, host, port, target, version, net::use_future);

    ioc.run();
    try {
        std::cout << "From future: " << future.get().reason() << "\n";
    } catch (boost::system::system_error const& se) {
        std::cout << "future exception: " << se.code().message() << std::endl;
    }
}

Output for a successful and failing requests:

$ ./sotest www.example.com 80 / 1.1
From callback: Success, OK
From coro: Success, OK
From coro (try/catch): OK
From future: OK

$ ./sotest www.example.com 81 / 1.1
From callback: The socket was closed due to a timeout, connect
coro exception: The socket was closed due to a timeout
From coro: The socket was closed due to a timeout, connect
From future: future exception: The socket was closed due to a timeout

$ ./sotest www.example.cough 80 / 1.1
From callback: Host not found (authoritative), resolve
coro exception: Host not found (authoritative)
From coro: Host not found (authoritative), resolve
From future: future exception: Host not found (authoritative)

$ ./sotest www.example.com rhubarb / 1.1
From callback: Service not found, resolve
coro exception: Service not found
From coro: Service not found, resolve
From future: future exception: Service not found

Note that the timeout example of course runs in ~30s total, because everything runs asynchronously.

这篇关于我如何使用从on_read处理程序分派的最终回调将响应异步返回给调用者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆