ASIO示例代码应在关闭套接字之前 [英] ASIO example code closing socket before it should

查看:71
本文介绍了ASIO示例代码应在关闭套接字之前的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用ASIO的并行同步TCP解决方案.我正在尝试从以下示例工作中获得示例代码: https://github.com/jvillasante/asio-network-programming-cookbook/tree/master/src (使用ch04中的服务器:02_Sync_parallel_tcp_server.cpp和ch03中的客户端:01_Sync_tcp_client.cpp).

我唯一更改的是添加到文本文件的日志记录.

问题在于,尽管服务器运行良好,但客户端在从服务器返回单个响应后死亡:

libc++abi.dylib: terminating with uncaught exception of type boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >: shutdown: Socket is not connected

服务器代码:

#include <boost/asio.hpp>
#include <atomic>
#include <memory>
#include <thread>
#include <iostream>
#include <fstream>

using namespace boost;

class Service {
public:
  Service() = default;

  void StartHandlingClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    std::thread th{[this, sock]() { HandleClient(sock); }};
    th.detach();
  }

private:
  void HandleClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    try {
      asio::streambuf request;
      asio::read_until(*sock.get(), request, '\n');

      std::istream is(&request);
      std::string line;
      std::getline(is, line);

      std::ofstream log("logfile2.txt", std::ios_base::app | std::ios_base::out);
      log << "Request: " << line << "\n" << std::flush;

      // Emulate request processing.
      int i = 0;
      while (i != 1000000) i++;
      std::this_thread::sleep_for(std::chrono::milliseconds(500));

      // Sending response.
      std::string response = "Response\n";
      asio::write(*sock.get(), asio::buffer(response));
    } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    }

    // Clean up
    delete this;
  }
};

class Acceptor {
public:
  Acceptor(asio::io_service& ios, unsigned short port_num)
  : m_ios{ios}, m_acceptor{m_ios, asio::ip::tcp::endpoint{asio::ip::address_v4::any(), port_num}} {
    m_acceptor.listen();
  }

  void Accept() {
    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

    m_acceptor.accept(*sock.get());

    (new Service)->StartHandlingClient(sock);
  }

private:
  asio::io_service& m_ios;
  asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
  Server() : m_stop{false} {}

  void Start(unsigned short port_num) {
    m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
  }

  void Stop() {
    m_stop.store(true);
    m_thread->join();
  }

private:
  void Run(unsigned short port_num) {
    Acceptor acc{m_ios, port_num};

    while (!m_stop.load()) {
      acc.Accept();
    }
  }

private:
  std::unique_ptr<std::thread> m_thread;
  std::atomic<bool> m_stop;
  asio::io_service m_ios;
};

int main() {
  unsigned short port_num = 3333;

  try {
    Server srv;
    srv.Start(port_num);

    std::this_thread::sleep_for(std::chrono::seconds(60));

    srv.Stop();
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
  }

  return 0;
}

客户端代码:

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>

using namespace boost;

class SyncTCPClient {
public:
  SyncTCPClient(const std::string& raw_ip_address, unsigned short port_num)
  : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
    m_sock.open(m_ep.protocol());
  }
  ~SyncTCPClient() { close(); }

  void connect() { m_sock.connect(m_ep); }

  std::string emulateLongComputationOp(unsigned int duration_sec) {
    std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
    sendRequest(request);
    return receiveResponse();
  }

private:
  void close() {
    if (m_sock.is_open()) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "shutting down\n" << std::flush;
      m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
      log << "closing the socket\n" << std::flush;
      m_sock.close();
      log << "socket closed\n" << std::flush;
    }
  }

  void sendRequest(const std::string& request) { asio::write(m_sock, asio::buffer(request)); }

  std::string receiveResponse() {
    asio::streambuf buf;
    asio::read_until(m_sock, buf, '\n');

    std::istream input(&buf);
    std::string response;
    std::getline(input, response);

    return response;
  }

private:
  asio::io_service m_ios;
  asio::ip::tcp::endpoint m_ep;
  asio::ip::tcp::socket m_sock;
};

int main() {
  const std::string raw_ip_address = "127.0.0.1";
  const unsigned short port_num = 3333;

  try {
    SyncTCPClient client{raw_ip_address, port_num};

    // Sync connect.
    client.connect();

    std::cout << "Sending request to the server...\n";
    std::string response = client.emulateLongComputationOp(10);

    std::cout << "Response received: " << response << "\n";
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    return e.code().value();
  }

  return 0;
}

解决方案

我没有看到很多错误,并且无法使用所示代码重现该问题.

看到的东西:

  1. 线程过程可能是静态的,因为它是无状态的(delete this是代码的味道)
  2. 不需要分离线程(使用boost::thread_group::join_all会更好)
  3. 您正在从服务器和客户端写入同一日志文件;结果未定义
  4. atomic<bool>
  5. 拼写.store().load()是不习惯的
  6. 在任何类型的智能指针上拼写*sock.get()都是 不可原谅 不习惯
  7. 编写code().value()-吞下类别-是一件不好的事情,而e.what()并不是获取消息的方法(使用e.code().message()).
  8. 如果需要flush,则最好使用std::endl
  9. 实际上没有理由在c ++ 14中使用shared_ptr:

    asio::ip::tcp::socket sock(m_ios);
    
    m_acceptor.accept(sock);
    
    std::thread([sock=std::move(sock)]() mutable { HandleClient(sock); }).detach();
    

    在C ++ 11中,坚持:

    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);
    
    m_acceptor.accept(*sock);
    
    std::thread([sock] { HandleClient(*sock); }).detach();
    

    这意味着HandleClient只能使用ip::tcp::socket&而不是智能指针.

INTEGRATING

Server.cpp

#include <atomic>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <memory>
#include <thread>

using namespace boost;

static void HandleClient(asio::ip::tcp::socket& sock) {
    try {
        asio::streambuf buf;
        asio::read_until(sock, buf, '\n');

        std::string request;
        getline(std::istream(&buf), request);

        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << "Request: " << request << std::endl;

        // Emulate request processing.
        int i = 0;
        while (i != 1000000)
            i++;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        // Sending response.
        std::string response = "Response\n";
        asio::write(sock, asio::buffer(response));
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

class Acceptor {
  public:
    Acceptor(asio::io_service &ios, unsigned short port_num)
            : m_ios{ ios }, m_acceptor{ m_ios, asio::ip::tcp::endpoint{ asio::ip::address_v4::any(), port_num } } {
        m_acceptor.listen();
    }

    void Accept() {
        auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

        m_acceptor.accept(*sock);

        std::thread([sock] { HandleClient(*sock); }).detach();
    }

  private:
    asio::io_service &m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};

class Server {
  public:
    Server() : m_stop{ false } {}

    void Start(unsigned short port_num) {
        m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
    }

    void Stop() {
        m_stop = true;
        m_thread->join();
    }

  private:
    void Run(unsigned short port_num) {
        Acceptor acc{ m_ios, port_num };

        while (!m_stop) {
            acc.Accept();
        }
    }

  private:
    std::unique_ptr<std::thread> m_thread;
    std::atomic<bool> m_stop;
    asio::io_service m_ios;
};

int main() {
    unsigned short port_num = 3333;

    try {
        Server srv;
        srv.Start(port_num);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

Client.cpp

#include <boost/asio.hpp>
#include <fstream>
#include <iostream>

using namespace boost;

class SyncTCPClient {
  public:
    SyncTCPClient(const std::string &raw_ip_address, unsigned short port_num)
            : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
        m_sock.open(m_ep.protocol());
    }
    ~SyncTCPClient() { close(); }

    void connect() { m_sock.connect(m_ep); }

    std::string emulateLongComputationOp(unsigned int duration_sec) {
        std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
        sendRequest(request);
        return receiveResponse();
    }

  private:
    void close() {
        if (m_sock.is_open()) {
            std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
            log << "shutting down" << std::endl;
            m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
            log << "closing the socket" << std::endl;
            m_sock.close();
            log << "socket closed" << std::endl;
        }
    }

    void sendRequest(const std::string &request) { asio::write(m_sock, asio::buffer(request)); }

    std::string receiveResponse() {
        asio::streambuf buf;
        asio::read_until(m_sock, buf, '\n');

        std::string response;
        getline(std::istream(&buf), response);

        return response;
    }

  private:
    asio::io_service m_ios;
    asio::ip::tcp::endpoint m_ep;
    asio::ip::tcp::socket m_sock;
};

int main() {
    const std::string raw_ip_address = "127.0.0.1";
    const unsigned short port_num = 3333;

    try {
        SyncTCPClient client{ raw_ip_address, port_num };

        // Sync connect.
        client.connect();

        std::cout << "Sending request to the server...\n";
        std::string response = client.emulateLongComputationOp(10);

        std::cout << "Response received: " << response << std::endl;
    } catch (std::system_error &e) {
        std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
        return e.code().value();
    }
}

I need a parallel synchronous TCP solution using ASIO. I'm trying to get the example code from these examples working: https://github.com/jvillasante/asio-network-programming-cookbook/tree/master/src (using the server in ch04: 02_Sync_parallel_tcp_server.cpp and the client in ch03: 01_Sync_tcp_client.cpp).

The only thing I changed is the logging to append to text files.

The problem is that while the server runs fine, the client dies after returning a single response from the server:

libc++abi.dylib: terminating with uncaught exception of type boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >: shutdown: Socket is not connected

Code for the server:

#include <boost/asio.hpp>
#include <atomic>
#include <memory>
#include <thread>
#include <iostream>
#include <fstream>

using namespace boost;

class Service {
public:
  Service() = default;

  void StartHandlingClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    std::thread th{[this, sock]() { HandleClient(sock); }};
    th.detach();
  }

private:
  void HandleClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
    try {
      asio::streambuf request;
      asio::read_until(*sock.get(), request, '\n');

      std::istream is(&request);
      std::string line;
      std::getline(is, line);

      std::ofstream log("logfile2.txt", std::ios_base::app | std::ios_base::out);
      log << "Request: " << line << "\n" << std::flush;

      // Emulate request processing.
      int i = 0;
      while (i != 1000000) i++;
      std::this_thread::sleep_for(std::chrono::milliseconds(500));

      // Sending response.
      std::string response = "Response\n";
      asio::write(*sock.get(), asio::buffer(response));
    } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    }

    // Clean up
    delete this;
  }
};

class Acceptor {
public:
  Acceptor(asio::io_service& ios, unsigned short port_num)
  : m_ios{ios}, m_acceptor{m_ios, asio::ip::tcp::endpoint{asio::ip::address_v4::any(), port_num}} {
    m_acceptor.listen();
  }

  void Accept() {
    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

    m_acceptor.accept(*sock.get());

    (new Service)->StartHandlingClient(sock);
  }

private:
  asio::io_service& m_ios;
  asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
  Server() : m_stop{false} {}

  void Start(unsigned short port_num) {
    m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
  }

  void Stop() {
    m_stop.store(true);
    m_thread->join();
  }

private:
  void Run(unsigned short port_num) {
    Acceptor acc{m_ios, port_num};

    while (!m_stop.load()) {
      acc.Accept();
    }
  }

private:
  std::unique_ptr<std::thread> m_thread;
  std::atomic<bool> m_stop;
  asio::io_service m_ios;
};

int main() {
  unsigned short port_num = 3333;

  try {
    Server srv;
    srv.Start(port_num);

    std::this_thread::sleep_for(std::chrono::seconds(60));

    srv.Stop();
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
  }

  return 0;
}

Code for the client:

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>

using namespace boost;

class SyncTCPClient {
public:
  SyncTCPClient(const std::string& raw_ip_address, unsigned short port_num)
  : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
    m_sock.open(m_ep.protocol());
  }
  ~SyncTCPClient() { close(); }

  void connect() { m_sock.connect(m_ep); }

  std::string emulateLongComputationOp(unsigned int duration_sec) {
    std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
    sendRequest(request);
    return receiveResponse();
  }

private:
  void close() {
    if (m_sock.is_open()) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "shutting down\n" << std::flush;
      m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
      log << "closing the socket\n" << std::flush;
      m_sock.close();
      log << "socket closed\n" << std::flush;
    }
  }

  void sendRequest(const std::string& request) { asio::write(m_sock, asio::buffer(request)); }

  std::string receiveResponse() {
    asio::streambuf buf;
    asio::read_until(m_sock, buf, '\n');

    std::istream input(&buf);
    std::string response;
    std::getline(input, response);

    return response;
  }

private:
  asio::io_service m_ios;
  asio::ip::tcp::endpoint m_ep;
  asio::ip::tcp::socket m_sock;
};

int main() {
  const std::string raw_ip_address = "127.0.0.1";
  const unsigned short port_num = 3333;

  try {
    SyncTCPClient client{raw_ip_address, port_num};

    // Sync connect.
    client.connect();

    std::cout << "Sending request to the server...\n";
    std::string response = client.emulateLongComputationOp(10);

    std::cout << "Response received: " << response << "\n";
  } catch (std::system_error& e) {
      std::ofstream log("logfile1.txt", std::ios_base::app | std::ios_base::out);
      log << "Error occurred! Error code = " << e.code().value() << ". Message: " << e.what() << "\n" << std::flush;
    return e.code().value();
  }

  return 0;
}

解决方案

I don't see a lot wrong, and I cannot reproduce the problem with the code shown.

Things I do see:

  1. the thread procedure could be a static because it's stateless (delete this is a code smell)
  2. the thread needn't be detached (using boost::thread_group::join_all would be much better)
  3. you were writing to the same logfile from server as well as client; results are undefined
  4. spelling .store() and .load() on an atomic<bool> is un-idiomatic
  5. spelling out *sock.get() on any kind of smart pointer is unforgivably un-idiomatic
  6. writing code().value() - swallowing the category - is a BAD thing to do, and e.what() is NOT the way to get the message (use e.code().message()).
  7. If you need flush, you might as well use std::endl
  8. There's really no reason to use a shared_ptr in c++14:

    asio::ip::tcp::socket sock(m_ios);
    
    m_acceptor.accept(sock);
    
    std::thread([sock=std::move(sock)]() mutable { HandleClient(sock); }).detach();
    

    In C++11 stick to:

    auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);
    
    m_acceptor.accept(*sock);
    
    std::thread([sock] { HandleClient(*sock); }).detach();
    

    This means HandleClient can just take a ip::tcp::socket& instead of a smart pointer.

INTEGRATING

Server.cpp

#include <atomic>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <memory>
#include <thread>

using namespace boost;

static void HandleClient(asio::ip::tcp::socket& sock) {
    try {
        asio::streambuf buf;
        asio::read_until(sock, buf, '\n');

        std::string request;
        getline(std::istream(&buf), request);

        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << "Request: " << request << std::endl;

        // Emulate request processing.
        int i = 0;
        while (i != 1000000)
            i++;
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        // Sending response.
        std::string response = "Response\n";
        asio::write(sock, asio::buffer(response));
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

class Acceptor {
  public:
    Acceptor(asio::io_service &ios, unsigned short port_num)
            : m_ios{ ios }, m_acceptor{ m_ios, asio::ip::tcp::endpoint{ asio::ip::address_v4::any(), port_num } } {
        m_acceptor.listen();
    }

    void Accept() {
        auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

        m_acceptor.accept(*sock);

        std::thread([sock] { HandleClient(*sock); }).detach();
    }

  private:
    asio::io_service &m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};

class Server {
  public:
    Server() : m_stop{ false } {}

    void Start(unsigned short port_num) {
        m_thread.reset(new std::thread([this, port_num]() { Run(port_num); }));
    }

    void Stop() {
        m_stop = true;
        m_thread->join();
    }

  private:
    void Run(unsigned short port_num) {
        Acceptor acc{ m_ios, port_num };

        while (!m_stop) {
            acc.Accept();
        }
    }

  private:
    std::unique_ptr<std::thread> m_thread;
    std::atomic<bool> m_stop;
    asio::io_service m_ios;
};

int main() {
    unsigned short port_num = 3333;

    try {
        Server srv;
        srv.Start(port_num);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    } catch (std::system_error &e) {
        std::ofstream log("server.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
    }
}

Client.cpp

#include <boost/asio.hpp>
#include <fstream>
#include <iostream>

using namespace boost;

class SyncTCPClient {
  public:
    SyncTCPClient(const std::string &raw_ip_address, unsigned short port_num)
            : m_ep(asio::ip::address::from_string(raw_ip_address), port_num), m_sock(m_ios) {
        m_sock.open(m_ep.protocol());
    }
    ~SyncTCPClient() { close(); }

    void connect() { m_sock.connect(m_ep); }

    std::string emulateLongComputationOp(unsigned int duration_sec) {
        std::string request = "EMULATE_LONG_COMP_OP " + std::to_string(duration_sec) + "\n";
        sendRequest(request);
        return receiveResponse();
    }

  private:
    void close() {
        if (m_sock.is_open()) {
            std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
            log << "shutting down" << std::endl;
            m_sock.shutdown(asio::ip::tcp::socket::shutdown_both);
            log << "closing the socket" << std::endl;
            m_sock.close();
            log << "socket closed" << std::endl;
        }
    }

    void sendRequest(const std::string &request) { asio::write(m_sock, asio::buffer(request)); }

    std::string receiveResponse() {
        asio::streambuf buf;
        asio::read_until(m_sock, buf, '\n');

        std::string response;
        getline(std::istream(&buf), response);

        return response;
    }

  private:
    asio::io_service m_ios;
    asio::ip::tcp::endpoint m_ep;
    asio::ip::tcp::socket m_sock;
};

int main() {
    const std::string raw_ip_address = "127.0.0.1";
    const unsigned short port_num = 3333;

    try {
        SyncTCPClient client{ raw_ip_address, port_num };

        // Sync connect.
        client.connect();

        std::cout << "Sending request to the server...\n";
        std::string response = client.emulateLongComputationOp(10);

        std::cout << "Response received: " << response << std::endl;
    } catch (std::system_error &e) {
        std::ofstream log("client.log", std::ios_base::app | std::ios_base::out);
        log << e.what() << " " << e.code() << ": " << e.code().message() << std::endl;
        return e.code().value();
    }
}

这篇关于ASIO示例代码应在关闭套接字之前的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆