Boost ASIO async_read不从客户端读取数据 [英] Boost ASIO async_read is not reading data from the client

查看:81
本文介绍了Boost ASIO async_read不从客户端读取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务器/客户端应用程序,用于从客户端写入服务器上的读取内容.

I have a server/client application which works for a write from client to a read at server.

在服务器代码中的startHandlig函数中,如果我注释async_connect_1及其后的返回值,则它可以正常工作,其中涉及同步写入功能.

Inside the startHandlig function in the server code, if I comment async_connect_1 and the return after it, then it works fine which involves sync write function.

我在Service()类中添加了async_connect_1函数,以从套接字异步读取.

I added async_connect_1 function inside Service() class to asynchronously read from the socket.

当客户端连接到服务器并且此函数立即返回时,将调用此函数.

This function is called when a client connects to the server and this function returns immediately.

我希望与async_read相对应的回调函数被调用,但是那没有发生...

I expect the callback function corresponding to async_read to be called, but that is not happening...

很久以来我一直坚持这一点.在此方面提供帮助...

I'm stuck at this since a long time.. Appreciate help on this...

服务器代码

#include <boost/asio.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/bind.hpp>
#include <boost/serialization/vector.hpp>
#include <boost/tuple/tuple.hpp>

#include <thread>
#include <atomic>
#include <memory>
#include <iostream>

#include "../stocks.hpp"
using namespace boost;


class Service {
public:
    Service(){}

    void StartHandligClient(
        std::shared_ptr<asio::ip::tcp::socket> sock) {

        read_async_1(sock);
        return;

        std::thread th(([this, sock]() {
            HandleClient(sock);
        }));

        std::cout << "Detached \n";

        th.detach();
    }

private:
    void read_async_1(std::shared_ptr<asio::ip::tcp::socket> sock)
    {
        if(!(*sock.get()).is_open())
        {
            std::cout << getpid() << " : Socket closed in sync_read \n" << std::flush;
            return ;
        }

        std::cout << "haha_1\n" << std::flush;

        boost::asio::async_read( (*sock.get()), boost::asio::buffer(inbound_header_),
                [this](boost::system::error_code ec,
                    size_t bytesRead)
                {
                std::cout << "haha_2\n" << std::flush;

                if (!ec)
                {
                int headerBytesReceived = bytesRead;

                std::cout << "\n\n headerBytesReceived : " << headerBytesReceived << "\n" << std::flush ;
                // this->async_read(sock);

                }
                else
                {
                    // Terminate connection ?
                    if(ec == boost::asio::error::eof)
                    {
                            std::cout << getpid() << " : ** sync_read : Connection lost  : boost::asio::error::eof ** \n";
                    }
                        std::cout << "Error occured in sync_read! Error code = " << ec.value() << ". Message: " << ec.message() << "\n" << std::flush;

                    return ;
                }
                return ;
                }
        );
        std::cout << getpid() << " : final return from async_read \n" << std::flush;
        return ;
    }
    void HandleClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
        while(1)
        {
            try {
                // asio::streambuf request;
                // asio::read_until(*sock.get(), request, '\n');

                int headerBytesReceived = asio::read( *sock.get(), boost::asio::buffer(inbound_header_) );
                std::cout << "headerBytesReceived : " << headerBytesReceived << "\n" << std::flush;

                // Determine the length of the serialized data.
                std::istringstream is(std::string(inbound_header_, header_length));
                std::cout << "is : " << is.str() << ", inbound_header_ : " << inbound_header_ << "\n";

                std::size_t inbound_data_size = 0;
                if (!(is >> std::hex >> inbound_data_size))
                {
                    // Header doesn't seem to be valid. Inform the caller.
                    // boost::system::error_code error(boost::asio::error::invalid_argument);
                    // boost::get<0>(handler)(error);
                    std::cout << "RET-1 \n";
                    return;
                }

                std::cout << "inbound_data_size : " << inbound_data_size << "\n" << std::flush;

                // Start an asynchronous call to receive the data.
                inbound_data_.resize(inbound_data_size);

                std::cout << "inbound_data_.size() : " << inbound_data_.size() << "\n" << std::flush;

                int bytesReceived = asio::read( *sock.get(), boost::asio::buffer(inbound_data_) );

                std::string archive_data(&inbound_data_[0], inbound_data_.size());
                std::istringstream archive_stream(archive_data);
                boost::archive::text_iarchive archive(archive_stream);
                archive >> stocks_;


                std::cout << "bytesReceived : " << bytesReceived << " , stocks_.size() : " << stocks_.size() << "\n";
                // Print out the data that was received.
                for (std::size_t i = 0; i < stocks_.size(); ++i)
                {
                    std::cout << "Stock number " << i << "\n";
                    std::cout << "  code: " << stocks_[i].code << "\n";
                    std::cout << "  name: " << stocks_[i].name << "\n";
                    std::cout << "  open_price: " << stocks_[i].open_price << "\n";
                    std::cout << "  high_price: " << stocks_[i].high_price << "\n";
                    std::cout << "  low_price: " << stocks_[i].low_price << "\n";
                    std::cout << "  last_price: " << stocks_[i].last_price << "\n";
                    std::cout << "  buy_price: " << stocks_[i].buy_price << "\n";
                    std::cout << "  buy_quantity: " << stocks_[i].buy_quantity << "\n";
                    std::cout << "  sell_price: " << stocks_[i].sell_price << "\n";
                    std::cout << "  sell_quantity: " << stocks_[i].sell_quantity << "\n";
                }            
            }
            catch (system::system_error &e)
            {
                boost::system::error_code ec = e.code();
                if(ec == boost::asio::error::eof)
                {
                    std::cout << "EOF Error \n";
                }
                std::cout << "Server Error occured! Error code = "
                    << e.code() << ". Message: "
                    << e.what() << "\n";
                break;
            }
        }

        // Clean-up.
        delete this;
    }
/// The size of a fixed length header.
enum { header_length = 8 };

/// Holds an outbound header.
std::string outbound_header_;

/// Holds the outbound data.
std::string outbound_data_;

/// Holds an inbound header.
char inbound_header_[header_length];

/// Holds the inbound data.
std::vector<char> inbound_data_;
    std::vector<stock> stocks_;
};

class Acceptor {
public:
    Acceptor(asio::io_service& ios, unsigned short port_num) :
        m_ios(ios),
        m_acceptor(m_ios,
        asio::ip::tcp::endpoint(
        asio::ip::address_v4::any(),
        port_num))
    {
        m_acceptor.listen();
    }

    void Accept() {
        std::cout << "Server Accept() \n" << std::flush;
        std::shared_ptr<asio::ip::tcp::socket>
            sock(new asio::ip::tcp::socket(m_ios));

        m_acceptor.accept(*sock.get());

        (new Service)->StartHandligClient(sock);
    }

private:
    asio::io_service& m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
    Server() : m_stop(false) {}

    void Start(unsigned short port_num) {
        m_thread.reset(new std::thread([this, port_num]() {
            Run(port_num);
        }));
    }

    void Stop() {
        m_stop.store(true);
        m_thread->join();
    }

private:
    void Run(unsigned short port_num) {
        Acceptor acc(m_ios, port_num);

        while (!m_stop.load()) {
            std::cout << "Server accept\n" << std::flush;
            acc.Accept();
        }
    }

    std::unique_ptr<std::thread> m_thread;
    std::atomic<bool> m_stop;
    asio::io_service m_ios;
};

int main()
{
    unsigned short port_num = 3333;

    try {
        Server srv;
        srv.Start(port_num);

        std::this_thread::sleep_for(std::chrono::seconds(100));

        std::cout << "Stopping server \n";

        srv.Stop();
    }
    catch (system::system_error &e) {
        std::cout << "Error occured! Error code = "
            << e.code() << ". Message: "
            << e.what();
    }

    return 0;
}

客户代码

#include <boost/asio.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/bind.hpp>
#include <boost/serialization/vector.hpp>
#include <iostream>

#include "../stocks.hpp"
using namespace boost;

class SyncTCPClient {
public:
    SyncTCPClient(const std::string& raw_ip_address,
        unsigned short port_num) :
        m_ep(asio::ip::address::from_string(raw_ip_address),
        port_num),
        m_sock(m_ios) {

        m_sock.open(m_ep.protocol());
    }

    void connect() {
        m_sock.connect(m_ep);
    }

    void close() {
        m_sock.shutdown(
            boost::asio::ip::tcp::socket::shutdown_both);
        m_sock.close();
    }

    std::string emulateLongComputationOp(
        unsigned int duration_sec) {

        std::string request = "EMULATE_LONG_COMP_OP "
            + std::to_string(duration_sec)
            + "\n";

        sendRequest(request);
        return receiveResponse();
    };

private:
    void sendRequest(const std::string& request) {

        std::vector<stock> stocks_;
        // Create the data to be sent to each client.
        stock s;
        s.code = "ABC";
        s.name = "A Big Company";
        s.open_price = 4.56;
        s.high_price = 5.12;
        s.low_price = 4.33;
        s.last_price = 4.98;
        s.buy_price = 4.96;
        s.buy_quantity = 1000;
        s.sell_price = 4.99;
        s.sell_quantity = 2000;
        stocks_.push_back(s);

        // Serialize the data first so we know how large it is.
        std::ostringstream archive_stream;
        boost::archive::text_oarchive archive(archive_stream);
        archive << stocks_;
        outbound_data_ = archive_stream.str();
        std::cout << "outbound_data_ : " << outbound_data_ << "\n" << std::flush;
        std::cout << "outbound_data_.size() : " << outbound_data_.size() << "\n" << std::flush;

        // Format the header.
        std::ostringstream header_stream;
        header_stream << std::setw(header_length)  << std::hex << outbound_data_.size();

        std::cout << "header_stream.str() : " << header_stream.str() << "\n" << std::flush;
        std::cout << "header_stream.str().size() : " << header_stream.str().size() << "\n" << std::flush;

        if (!header_stream || header_stream.str().size() != header_length)
        {
            // Something went wrong, inform the caller.
            // boost::system::error_code error(boost::asio::error::invalid_argument);
            // socket_.get_io_service().post(boost::bind(handler, error));
            return;
        }

        outbound_header_ = header_stream.str();
        std::cout << "outbound_header_ : " << outbound_header_ << "\n" << std::flush;

        // Write the serialized data to the socket. We use "gather-write" to send
        // both the header and the data in a single write operation.
        std::vector<boost::asio::const_buffer> buffers;
        buffers.push_back(boost::asio::buffer(outbound_header_));
        buffers.push_back(boost::asio::buffer(outbound_data_));
        std::size_t sizeSent = asio::write(m_sock, buffers);
        std::cout << "sizeSent : " << sizeSent << "\n" << std::flush;

    }

    std::string receiveResponse() {
        std::string response;
        /*
        asio::streambuf buf;
        asio::read_until(m_sock, buf, '\n');

        std::istream input(&buf);

        std::getline(input, response);
        */

        return response;
    }

private:
    asio::io_service m_ios;

    asio::ip::tcp::endpoint m_ep;
    asio::ip::tcp::socket m_sock;
    enum { header_length = 8 };
    std::string outbound_data_;
    std::string outbound_header_;

};

int main()
{
    const std::string raw_ip_address = "127.0.0.1";
    const unsigned short port_num = 3333;

    try {
        SyncTCPClient client(raw_ip_address, port_num);

        // Sync connect.
        client.connect();

        sleep(1);

        std::cout << "Sending request to the server... "
            << std::endl;

        std::string response = client.emulateLongComputationOp(10);

        std::cout << "Response received: " << response << std::endl;

        sleep(100);
        std::cout << "\n\n Closing client connection \n\n";

        // Close the connection and free resources.
        client.close();
    }
    catch (system::system_error &e) {
        std::cout << "Client Error occured! Error code = " << e.code()
            << ". Message: " << e.what();

        return e.code().value();
    }

    return 0;
}

包含的文件(stocks.hpp)

#ifndef _STOCKS_HPP_
#define _STOCKS_HPP_

struct stock
{
  std::string code;
  std::string name;
  double open_price;
  double high_price;
  double low_price;
  double last_price;
  double buy_price;
  int buy_quantity;
  double sell_price;
  int sell_quantity;

  template <typename Archive>
  void serialize(Archive& ar, const unsigned int version)
  {
    ar & code;
    ar & name;
    ar & open_price;
    ar & high_price;
    ar & low_price;
    ar & last_price;
    ar & buy_price;
    ar & buy_quantity;
    ar & sell_price;
    ar & sell_quantity;
  }
};

#endif

推荐答案

您在先前的响应中已将Error code = 125. Message: Operation canceled作为注释编写,我认为套接字可能会在完成异步操作之前关闭.

You have written Error code = 125. Message: Operation canceled as comment in previous response, i think that socket may be closed before async operation will be done.

插座的使用寿命是多少?

What is lifetime of your socket ?

[1]套接字是用Accept方法创建的

[1] socket is created in Accept method

    std::shared_ptr<asio::ip::tcp::socket>
    sock(new asio::ip::tcp::socket(m_ios)); // ref count +1
    //...
    (new Service)->StartHandligClient(sock); // this function returns immediately
    // so socket's ref count -1

StartHandligClient()中的

[2] sock按值传递,因此套接字+1的引用计数,但

[2] in StartHandligClient() sock is passed by value, so ref count of socket +1, but

    void StartHandligClient(
    std::shared_ptr<asio::ip::tcp::socket> sock) { // +1 ref count

    read_async_1(sock); // this function returns immediately
    return;  // -1 ref count of socket
}

套接字中的

[3]按值传递,套接字的引用计数为+1,但此函数立即返回,当函数结束时,引用计数减少,套接字对象被删除.

[3] in read_async_1 socket is passed by value, +1 on ref count of socket, but this function returns immediately, when function ends, ref count is decreased and socket object is deleted.

您创建了lambda对象来执行异步操作,但是套接字对象可能会在执行前关闭.

You created lambda object to execute asynchronus operation, but socket object may be closed before doing it.

这篇关于Boost ASIO async_read不从客户端读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆