将数据写入客户端时,提升协程服务器崩溃 [英] boost coroutine server crashes when writting data to client

查看:133
本文介绍了将数据写入客户端时,提升协程服务器崩溃的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我基于boost协程回显服务器示例制作了服务器,仅接收并写回了一些数据.在向客户端写入数据时崩溃,更奇怪的是,仅在使用多核时崩溃.

I made my server based on boost coroutine echo server example, simply receives and writes back some data. It crashes when writing data to client, and more strangely, it only crashes when using mutiple cores.

在这里是服务器,它读取4个字节并在1秒内将"OK"写回超时:

Here's the server, it reads 4 bytes and write back "OK", within 1 second as timeout:

#include <winsock2.h>
#include <windows.h>

#include <iostream>
using namespace std;

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;

#define SERVER_PORT 1234
#define DATA_LEN_4 4

#define TIMEOUT_LIMIT 1 // second

struct session : public std::enable_shared_from_this<session>
{
    tcp::socket socket_;
    boost::asio::steady_timer timer_;
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;

    explicit session(boost::asio::io_context& io_context, tcp::socket socket)
    : socket_(std::move(socket)),
      timer_(io_context),
      strand_(io_context.get_executor())
    { }

    void go()
    {
        auto self(shared_from_this());
        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
            try
            {
                timer_.expires_from_now(std::chrono::seconds(TIMEOUT_LIMIT));

                // recv data
                string packet;
                packet.resize(DATA_LEN_4); // alloc memory

                size_t received_len = 0;

                // read data
                {
                    size_t rs;
                    while(received_len < DATA_LEN_4) { // recv 4 bytes
                        boost::system::error_code ec;

                        rs = socket_.async_read_some(
                            boost::asio::buffer((char*)(packet.c_str()+received_len), DATA_LEN_4-received_len), yield[ec]);
                        if(ec==boost::asio::error::eof)
                            break; //connection closed cleanly by peer
                        else if(ec) {
                            throw "read_fail";
                        }
                        received_len += rs;
                    }
                }
                if(received_len < DATA_LEN_4) {
                    throw "recv too short, maybe timeout";
                }
                // write back "OK"
                {
                    boost::system::error_code ecw;
                    boost::asio::async_write(socket_, boost::asio::buffer(string("OK")), yield[ecw]);
                    if(ecw==boost::asio::error::eof)
                        return; //connection closed cleanly by peer
                    else if(ecw)
                        throw "write_fail"; // some other error
                }
            }
            catch (const char* reason) 
            {
                printf("exception reason: %s\n", reason);
                boost::system::error_code ecw;

                /*
                 * Question 1: why this 'async_write' line causes crash?
                 */
                // write the error reason to client
                boost::asio::async_write(socket_, boost::asio::buffer(string(reason)), yield[ecw]);

                socket_.close();
                timer_.cancel();
            }
            catch (...)
            {
                printf("unknown exception\n");
                socket_.close();
                timer_.cancel();
            }
        });

        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
            while (socket_.is_open())
            {
                boost::system::error_code ignored_ec;
                timer_.async_wait(yield[ignored_ec]);
                if (timer_.expires_from_now() <= std::chrono::seconds(0))
                    socket_.close();
            }
        });
    }
};

int main() {
    boost::asio::io_context io_context;

    boost::asio::spawn(io_context, [&](boost::asio::yield_context yield)
    {
        tcp::acceptor acceptor(io_context,
        tcp::endpoint(tcp::v4(), SERVER_PORT));

        for (;;)
        {
            boost::system::error_code ec;

            tcp::socket socket(io_context);
            acceptor.async_accept(socket, yield[ec]);
            if (!ec) 
                std::make_shared<session>(io_context, std::move(socket))->go();
        }
    });

    /*
     * When run on 1 CPU, it runs fine, no Crash 
     */
    // io_context.run();

    /*
     * Question 2:
     * But when run on multiple CPUs, it Crashes !!!
     * Why?
     */
    auto thread_count = std::thread::hardware_concurrency();
    boost::thread_group tgroup;
    for (auto i = 0; i < thread_count; ++i)
        tgroup.create_thread(boost::bind(&boost::asio::io_context::run, &io_context));
    tgroup.join_all();
}

请注意, 4字节数据包 1秒超时只是为了说明问题,实际服务器使用的大数据包可能会在网络状况不佳时导致超时.为了模拟这种情况,客户端每秒写入1个字节以触发服务器上的读取超时.

Please note, 4-bytes-packet and 1 second timeout is just to illustrate the problem, the real server uses large packets which may cause timeout on bad network condition. To simulate this, client writes 1 byte per second to trigger the read timeout on server.

客户:

#include <iostream>
#include <boost/asio.hpp>
using namespace std;

using boost::asio::ip::tcp;

#define SERVER "127.0.0.1"
#define PORT "1234"

int main() {
    boost::asio::io_context io_context;

    unsigned i = 1; 
    while(1) {
        try {
            tcp::socket s(io_context);
            tcp::resolver resolver(io_context);
            boost::asio::connect(s, resolver.resolve(SERVER, PORT));

            // to simulate the bad network condition,
            // write 4 bytes in 4 seconds to trigger the receive timeout on server, which is 1 second
            for(int i=0; i<4; i++) { 
                boost::asio::write(s, boost::asio::buffer(string("A")));
                std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1 second
            }

            // read echo
            char x[64] = {0};
            s.read_some(boost::asio::buffer(x, sizeof(x)));
            cout << i++ << ". received: " << x << endl;
        } catch (...) {
            cout << i++ << " exception" << endl;
        }
    }

    return 0;
}

问题1 :

为什么此行会导致崩溃?

Why this lines causes crash ?

boost::asio::async_write(socket_, boost::asio::buffer(string(reason)), yield[ecw]);

问题2 :

为什么服务器在1个cpu上运行时不会崩溃:io_context.run();?
并在使用thread_group的多个CPU上崩溃?

Why the server doesn't crash when it runs on 1 cpu: io_context.run(); ?
And crashes on multiple CPUs using thread_group ?

我的环境:Win10-64位,boost-1.71.0-64位,VisualStudio-2017-社区

My environment: Win10-64bit, boost-1.71.0-64bit, VisualStudio-2017-Community

推荐答案

问题1

ba::async_write(socket_, ba::buffer(string("OK")), yield[ecw]);

这会调用未定义的行为,因为您传递了一个临时字符串作为缓冲区,但是异步操作(根据定义)在async_write调用返回之前没有完成.

This invokes undefined behaviour because you pass a temporary string as the buffer, but the asynchronous operation (by definition) doesn't complete before the async_write call returns.

因此,缓冲区是对栈上被破坏的东西或现在存在的任何东西的陈旧引用.

Therefore the buffer is a stale reference to something destructed on the stack or whatever now lives there.

从逻辑上讲,发送缓冲区将成为self对象的一部分,以获取更适当的生存期.或者,由于您正在执行协程并且无论如何都将结束会话,因此只需使用write而不是async_write.

The send buffer would logically be part of the self object to get a more proper lifetime. Or, since you're doing coroutines and you're going to end the session anyhow, just use write instead of async_write.

那是因为未定义的行为是未定义的行为. 任何事情都会发生.

That because undefined behaviour is Undefined Behaviour. Anything can happen.

  • readtransfer_exactly(DATA_LEN_4)结合使用,或者将read_until与适当的完成条件结合使用.

  • Instead of read_some use read with transfer_exactly(DATA_LEN_4), or read_until with an appropriate completion condition.

您可以使用dynamic_buffer代替buffer(reserved_string).

您可以直接捕获system_error,其中代码表示出现了什么情况,而不是抛出魔术字符串:

Instead of throwing magical strings you can just catch system_error where code signifies what condition arose:

try {
    timer_.expires_from_now(std::chrono::seconds(TIMEOUT_LIMIT));

    // read data
    std::string packet;
    auto received_len = ba::async_read(socket_,
            ba::dynamic_buffer(packet),
            ba::transfer_exactly(DATA_LEN_4), yield);

    assert(received_len == DATA_LEN_4); // guaranteed

    // write back "OK"
    ba::write(socket_, ba::buffer("OK"s));
}
catch (boost::system::system_error const& e) {
    if (e.code() == ba::error::operation_aborted)
        std::cout << "canceled (timeout)" << std::endl;
    else if (e.code() == ba::error::eof)
        std::cout << "eof" << std::endl;
    else throw std::runtime_error(e.code().message());
}

  • 因此,现在您可以用通用的异常处理块将它们包装起来:

  • So, now you could wrap that with your generic exception handling block:

    try {
        // ...
    } catch (std::exception const& e) {
        std::cout << "exception: " << std::quoted(e.what()) << std::endl;
    
        boost::system::error_code ignore;
        ba::async_write(socket_, ba::buffer(std::string(e.what())), yield[ignore]);
    
        socket_.close();
        timer_.cancel();
    }
    

    但是!

    1. 通知客户有用或明智的做法似乎非常可疑
    2. 无法在coro中捕获异常会以任何方式破坏self实例,因此您可以简单地让它逃脱
    1. it seems highly dubious that informing your client is useful or even wise
    2. not catching the exception in the coro is going to destroy the self instance anyways so you can simply let it escape

    • 完成时间error_code已经表明计时器是到期还是取消:

    • The time completion error_code already signifies whether the timer was expired or canceled:

    while (socket_.is_open()) {
        boost::system::error_code ec;
        timer_.async_wait(yield[ec]);
    
        if (ba::error::operation_aborted != ec) // timer was not canceled
            socket_.close();
    }
    

  • 但是请注意,来自会话coro的常规返回路径不会在time_上调用.cancel().这将导致套接字在定时器到期之前再保持打开< 1秒.

  • Note however regular return paths from the session coro do NOT call .cancel() on the time_. That will lead the socket to be kept open another <1s until the timer expires.

    如果您想让异常从异常中逃脱(可以,并且您应该认为它发生了),则必须通过处理异常来改善线程循环:

    If you want to let exceptions escape from the coros (you can, and you should consider that it happens), you must improve the thread loops by handling exceptions: Should the exception thrown by boost::asio::io_service::run() be caught?

    结合花冠,并大大简化所有条件处理:

    Combining the coros, and greatly simplifying all condition handling:

    #include <iostream>
    #include <iomanip>
    
    #include <boost/thread/thread.hpp>
    #include <boost/asio.hpp>
    #include <boost/asio/spawn.hpp>
    #include <boost/scope_exit.hpp>
    
    using namespace std::literals;
    namespace ba = boost::asio;
    using ba::ip::tcp;
    
    static constexpr unsigned short SERVER_PORT = 1234;
    static constexpr std::size_t    DATA_LEN_4 = 4;
    static constexpr auto           TIMEOUT_LIMIT = 1s;
    
    struct session : public std::enable_shared_from_this<session>
    {
        tcp::socket socket_;
        ba::steady_timer timer_;
        ba::strand<ba::io_context::executor_type> strand_;
    
        explicit session(ba::io_context& io_context, tcp::socket socket)
        : socket_(std::move(socket)),
          timer_(io_context),
          strand_(io_context.get_executor())
        { }
    
        void go() {
            ba::spawn(strand_, [this, self = shared_from_this()](ba::yield_context yield) {
    
                spawn(yield, [this, self](ba::yield_context yield) {
                    timer_.expires_from_now(TIMEOUT_LIMIT);
                    while (socket_.is_open()) {
                        boost::system::error_code ec;
                        timer_.async_wait(yield[ec]);
                        if (ba::error::operation_aborted != ec) // timer was not canceled
                            socket_.close();
                    }
                });
    
                try {
                    // read data
                    std::string packet;
                    ba::async_read(socket_,
                            ba::dynamic_buffer(packet),
                            ba::transfer_exactly(DATA_LEN_4), yield);
    
                    // write back "OK"
                    ba::write(socket_, ba::buffer("OK"s));
                }
                catch (boost::system::system_error const& e) {
                    if (e.code() == ba::error::operation_aborted)
                        std::cout << "canceled (timeout)" << std::endl;
                    else if (e.code() == ba::error::eof)
                        std::cout << "eof" << std::endl;
                    else // throw std::runtime_error(e.code().message());
                        std::cout << "other: " << e.code().message() << std::endl;
                }
    
                socket_.close();
                timer_.cancel(); // cancel the other coro so we don't keep the session alive
            });
        }
    };
    
    int main() {
        ba::io_context io_context;
    
        ba::spawn(io_context, [&](ba::yield_context yield) {
            tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), SERVER_PORT));
    
            for (;;) {
                boost::system::error_code ec;
    
                tcp::socket socket(io_context);
                acceptor.async_accept(socket, yield[ec]);
                if (!ec) 
                    std::make_shared<session>(io_context, std::move(socket))->go();
            }
        });
    
        boost::thread_group tgroup;
        for (auto i = 0u; i < std::thread::hardware_concurrency(); ++i)
            tgroup.create_thread([&io_context] {
                for (;;) {
                    try { io_context.run(); break; } // exited normally
                    catch (std::exception const &e) { std::clog << "[eventloop] exception caught " << std::quoted(e.what()) << std::endl; } 
                    catch (...)                     { std::clog << "[eventloop] unknown exception caught" << std::endl;                   } 
                }
            });
    
        tgroup.join_all();
    }
    

    使用随机客户端

    随机更改睡眠,以使其有时有效并且有时超时:

    With a randomized Client

    Changing the sleep to be random, so that it sometimes works and sometimes times out:

    std::mt19937 prng { std::random_device{}() };
    for (int i = 0; i < 4; i++) {
        ba::write(s, ba::buffer(std::string("A")));
        std::this_thread::sleep_for(std::uniform_int_distribution<>(200, 400)(prng) * 1ms);
    }
    

    在我的系统上打印:

    1. received: OK
    2. received: OK
    3. received: OK
    canceled (timeout)
    4 exception read_some: End of file
    5. received: OK
    canceled (timeout)
    6 exception read_some: End of file
    7. received: OK
    8. received: OK
    

    看马,不要动手

    更简单,省去了特殊情况下的消息,实际上并没有太大变化:

    Look Ma, No Hands

    Even simpler, leaving off the special-case messages, doesn't actually change much:

    ba::spawn(strand_, [this, self = shared_from_this()](ba::yield_context yield) {
        try {
            ba::steady_timer timer(strand_, TIMEOUT_LIMIT);
            timer.async_wait([this](error_code ec) {
                if (ba::error::operation_aborted != ec) 
                    socket_.close();
                });
    
            std::string packet;
            ba::async_read(socket_,
                    ba::dynamic_buffer(packet),
                    ba::transfer_exactly(DATA_LEN_4), yield);
    
            ba::write(socket_, ba::buffer("OK"s));
        } catch(std::exception const& e) {
            std::clog << "error " << std::quoted(e.what()) << std::endl;
        }
    });
    

    请注意,我们什至不再需要timer_作为成员以及它的析构函数 也会在到达末尾时自动正确取消计时器 范围.

    Note how we don't even need timer_ as a member any more, and its destructor will automatically correctly cancel the timer as well, on reaching the end of scope.

    输出实际上并没有太大变化:

    The output doesn't actually change much:

    1. received: OK
    2. received: OK
    3. received: OK
    error "Operation canceled"
    4 exception read_some: End of file
    5. received: OK
    6. received: OK
    7. received: OK
    error "Operation canceled"
    8 exception read_some: End of file
    error "Operation canceled"
    9 exception read_some: End of file
    

    这篇关于将数据写入客户端时,提升协程服务器崩溃的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆