如何在多线程程序中使用boost :: asio正确处理fork()? [英] How do I handle fork() correctly with boost::asio in a multithreaded program?

查看:117
本文介绍了如何在多线程程序中使用boost :: asio正确处理fork()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在掌握如何正确处理从以多线程方式使用Boost Asio的多线程程序创建子进程方面遇到一些麻烦.

如果我理解正确,那么在Unix世界中启动子进程的方法是先调用fork(),再调用exec*().另外,如果我理解正确,则调用fork()将复制所有文件描述符,依此类推,除非标记为FD_CLOEXEC,否则需要在子进程中将它们关闭 (从而在调用时被原子关闭) exec*()).

在调用fork()时需要通知Boost Asio,以便通过调用 fork()文档).没有关于notify_fork()调用是异步信号安全的文档.实际上,如果我查看Boost Asio的源代码(至少在1.54版中),可能会调用信号概念,也有其他电话不在白名单中.)

问题#1我可能可以通过将子进程和套接字+文件的创建分开来解决,以便确保在正在创建的套接字和设置SOCK_CLOEXEC之间的窗口中没有子进程正在创建.问题2比较棘手,我可能需要确保所有 all asio处理程序线程都已停止,进行fork,然后再次重新创建它们,这在最佳情况下是很潮的,在最坏的情况下真的很糟糕(那我的待定计时器又如何呢?).问题#3似乎使正确使用它完全不可能.

如何与fork() + exec*()一起在多线程程序中正确使用Boost Asio? ...或者我是分叉"的?

请让我知道我是否对任何基本概念有误解(我是在Windows编程中长大的,而不是* nix ...).

*-实际上,可以在Linux上直接设置SOCK_CLOEXEC来创建套接字,此套接字自2.6.27开始可用(请参见 socket()文档).在Windows上,从Windows 7 SP 1/Windows Server 2008 R2 SP 1开始,相应的标记WSA_FLAG_NO_HANDLE_INHERIT可用(请参阅解决方案

在多线程程序中, fork()支持,因为这是子级关闭父级以前的内部文件描述符并创建新的描述符时的状态.尽管Boost.Asio明确列出了调用io_service::notify_fork()的前提条件,并保证了fork()期间其内部组件的状态,但对堆栈式协同程序.

 #include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

/// @brief launcher receives a command from inter-process communication,
///        and will then fork, allowing the child process to return to
///        the caller.
class launcher
{
public:
  launcher(boost::asio::io_service& io_service,
           boost::asio::local::datagram_protocol::socket& socket,
           std::string& command)
    : io_service_(io_service),
      socket_(socket),
      command_(command)
  {}

  void operator()(boost::asio::yield_context yield)
  {
    std::vector<char> buffer;
    while (command_.empty())
    {
      // Wait for server to write data.
      std::cout << "launcher is waiting for data" << std::endl;
      socket_.async_receive(boost::asio::null_buffers(), yield);

      // Resize buffer and read all data.
      buffer.resize(socket_.available());
      socket_.receive(boost::asio::buffer(buffer));

      io_service_.notify_fork(boost::asio::io_service::fork_prepare);
      if (fork() == 0) // child
      {
        io_service_.notify_fork(boost::asio::io_service::fork_child);
        command_.assign(buffer.begin(), buffer.end());
      }
      else // parent
      {
        io_service_.notify_fork(boost::asio::io_service::fork_parent);
      }
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::local::datagram_protocol::socket& socket_;
  std::string& command_;
};

using boost::asio::ip::udp;

/// @brief server reads filenames from UDP and then uses
///        inter-process communication to delegate forking and exec
///        to the child launcher process.
class server
{
public:
  server(boost::asio::io_service& io_service,
         boost::asio::local::datagram_protocol::socket& socket,
          short port)
    : io_service_(io_service),
      launcher_socket_(socket),
      socket_(boost::make_shared<udp::socket>(
        boost::ref(io_service), udp::endpoint(udp::v4(), port)))
  {}

  void operator()(boost::asio::yield_context yield)
  {
    udp::endpoint sender_endpoint;
    std::vector<char> buffer;
    for (;;)
    {
      std::cout << "server is waiting for data" << std::endl;
      // Wait for data to become available.
      socket_->async_receive_from(boost::asio::null_buffers(),
          sender_endpoint, yield);

      // Resize buffer and read all data.
      buffer.resize(socket_->available());
      socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
      std::cout << "server got data: ";
      std::cout.write(&buffer[0], buffer.size());
      std::cout << std::endl;

      // Write filename to launcher.
      launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::local::datagram_protocol::socket& launcher_socket_;

  // To be used as a coroutine, server must be copyable, so make socket_
  // copyable.
  boost::shared_ptr<udp::socket> socket_;
};

int main(int argc, char* argv[])
{
  std::string filename;

  // Try/catch provides exception handling, but also allows for the lifetime
  // of the io_service and its IO objects to be controlled.
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: <port>\n";
      return 1;
    }

    boost::thread_group threads;
    boost::asio::io_service io_service;

    // Create two connected sockets for inter-process communication.
    boost::asio::local::datagram_protocol::socket parent_socket(io_service);
    boost::asio::local::datagram_protocol::socket child_socket(io_service);
    boost::asio::local::connect_pair(parent_socket, child_socket);

    io_service.notify_fork(boost::asio::io_service::fork_prepare);
    if (fork() == 0) // child
    {
      io_service.notify_fork(boost::asio::io_service::fork_child);
      parent_socket.close();
      boost::asio::spawn(io_service,
          launcher(io_service, child_socket, filename));
    }
    else // parent
    {
      io_service.notify_fork(boost::asio::io_service::fork_parent);
      child_socket.close();
      boost::asio::spawn(io_service, 
          server(io_service, parent_socket, std::atoi(argv[1])));

      // Spawn additional threads.
      for (std::size_t i = 0; i < 3; ++i)
      {
        threads.create_thread(
          boost::bind(&boost::asio::io_service::run, &io_service));
      }
    }

    io_service.run();
    threads.join_all();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  // Now that the io_service and IO objects have been destroyed, all internal
  // Boost.Asio file descriptors have been closed, so the execl should be
  // in a clean state.  If the filename has been set, then exec touch.
  if (!filename.empty())
  {
    std::cout << "creating file: " << filename << std::endl;
    execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
  }
}
 

端子1:

$ ls
a.out  example.cpp
$ ./a.out 12345
server is waiting for data
launcher is waiting for data
server got data: a
server is waiting for data
launcher is waiting for data
creating file: a
server got data: b
server is waiting for data
launcher is waiting for data
creating file: b
server got data: c
server is waiting for data
launcher is waiting for data
creating file: c
ctrl + c
$ ls
a  a.out  b  c  example.cpp

终端2:

$ nc -u 127.0.0.1 12345
actrl + dbctrl + dcctrl + d

I'm having some trouble grasping how to correctly handle creating a child process from a multithreaded program that uses Boost Asio in a multithreaded fashion.

If I understand correctly, the way to launch a child process in the Unix world is to call fork() followed by an exec*(). Also, if I understand correctly, calling fork() will duplicate all file descriptors and so on and these need to be closed in the child process unless marked as FD_CLOEXEC (and thereby being atomically closed when calling exec*()).

Boost Asio requires to be notified when fork() is called in order to operate correctly by calling notify_fork(). However, in a multithreaded program this creates several issues:

  1. Sockets are by default inherited by child processes if I understand correctly. They can be set to SOCK_CLOEXEC - but not directly at creation*, thus leading to a timing window if a child process is being created from another thread.

  2. notify_fork() requires that no other thread calls any other io_service function, nor any function on any other I/O object associated with the io_service. This does not really seem to be feasible - after all the program is multithreaded for a reason.

  3. If I understand correctly, any function call made between fork() and exec*() needs to be async signal safe (see fork() documentation). There is no documentation of the notify_fork() call being async signal safe. In fact, if I look at the source code for Boost Asio (at least in version 1.54), there may be calls to pthread_mutex_lock, which is not async signal safe if I understand correctly (see Signal Concepts, there are also other calls being made that are not on the white list).

Issue #1 I can probably work around by separating creation of child processes and sockets + files so that I can ensure that no child process is being created in the window between a socket being created and setting SOCK_CLOEXEC. Issue #2 is trickier, I would probably need to make sure that all asio handler threads are stopped, do the fork and then recreate them again, which is tideous at best, and really really bad at worst (what about my pending timers??). Issue #3 seems to make it entirely impossible to use this correctly.

How do I correctly use Boost Asio in a multithreaded program together with fork() + exec*()? ... or am I "forked"?

Please let me know if I have misunderstood any fundamental concepts (I am raised on Windows programming, not *nix...).

Edit: * - Actually it is possible to create sockets with SOCK_CLOEXEC set directly on Linux, available since 2.6.27 (see socket() documentation). On Windows, the corresponding flag WSA_FLAG_NO_HANDLE_INHERIT is available since Windows 7 SP 1 / Windows Server 2008 R2 SP 1 (see WSASocket() documentation). OS X does not seem to support this though.

解决方案

In a multi-threaded program, io_service::notify_fork() is not safe to invoke in the child. Yet, Boost.Asio expects it to be called based on the fork() support, as this is when the child closes the parent's previous internal file descriptors and creates new ones. While Boost.Asio explicitly list the pre-conditions for invoking io_service::notify_fork(), guaranteeing the state of its internal components during the fork(), a brief glance at the implementation indicates that std::vector::push_back() may allocate memory from the free store, and the allocation is not guaranteed to be async-signal-safe.

With that said, one solution that may be worth considering is fork() the process when it is still single threaded. The child process will remain single threaded and perform fork() and exec() when it is told to do so by the parent process via inter-process communication. This separation simplifies the problem by removing the need to manage the state of multiple threads while performing fork() and exec().


Here is a complete example demonstrating this approach, where the multi-threaded server will receive filenames via UDP and a child process will perform fork() and exec() to run /usr/bin/touch on the filename. In hopes of making the example slightly more readable, I have opted to use stackful coroutines.

#include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

/// @brief launcher receives a command from inter-process communication,
///        and will then fork, allowing the child process to return to
///        the caller.
class launcher
{
public:
  launcher(boost::asio::io_service& io_service,
           boost::asio::local::datagram_protocol::socket& socket,
           std::string& command)
    : io_service_(io_service),
      socket_(socket),
      command_(command)
  {}

  void operator()(boost::asio::yield_context yield)
  {
    std::vector<char> buffer;
    while (command_.empty())
    {
      // Wait for server to write data.
      std::cout << "launcher is waiting for data" << std::endl;
      socket_.async_receive(boost::asio::null_buffers(), yield);

      // Resize buffer and read all data.
      buffer.resize(socket_.available());
      socket_.receive(boost::asio::buffer(buffer));

      io_service_.notify_fork(boost::asio::io_service::fork_prepare);
      if (fork() == 0) // child
      {
        io_service_.notify_fork(boost::asio::io_service::fork_child);
        command_.assign(buffer.begin(), buffer.end());
      }
      else // parent
      {
        io_service_.notify_fork(boost::asio::io_service::fork_parent);
      }
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::local::datagram_protocol::socket& socket_;
  std::string& command_;
};

using boost::asio::ip::udp;

/// @brief server reads filenames from UDP and then uses
///        inter-process communication to delegate forking and exec
///        to the child launcher process.
class server
{
public:
  server(boost::asio::io_service& io_service,
         boost::asio::local::datagram_protocol::socket& socket,
          short port)
    : io_service_(io_service),
      launcher_socket_(socket),
      socket_(boost::make_shared<udp::socket>(
        boost::ref(io_service), udp::endpoint(udp::v4(), port)))
  {}

  void operator()(boost::asio::yield_context yield)
  {
    udp::endpoint sender_endpoint;
    std::vector<char> buffer;
    for (;;)
    {
      std::cout << "server is waiting for data" << std::endl;
      // Wait for data to become available.
      socket_->async_receive_from(boost::asio::null_buffers(),
          sender_endpoint, yield);

      // Resize buffer and read all data.
      buffer.resize(socket_->available());
      socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
      std::cout << "server got data: ";
      std::cout.write(&buffer[0], buffer.size());
      std::cout << std::endl;

      // Write filename to launcher.
      launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::local::datagram_protocol::socket& launcher_socket_;

  // To be used as a coroutine, server must be copyable, so make socket_
  // copyable.
  boost::shared_ptr<udp::socket> socket_;
};

int main(int argc, char* argv[])
{
  std::string filename;

  // Try/catch provides exception handling, but also allows for the lifetime
  // of the io_service and its IO objects to be controlled.
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: <port>\n";
      return 1;
    }

    boost::thread_group threads;
    boost::asio::io_service io_service;

    // Create two connected sockets for inter-process communication.
    boost::asio::local::datagram_protocol::socket parent_socket(io_service);
    boost::asio::local::datagram_protocol::socket child_socket(io_service);
    boost::asio::local::connect_pair(parent_socket, child_socket);

    io_service.notify_fork(boost::asio::io_service::fork_prepare);
    if (fork() == 0) // child
    {
      io_service.notify_fork(boost::asio::io_service::fork_child);
      parent_socket.close();
      boost::asio::spawn(io_service,
          launcher(io_service, child_socket, filename));
    }
    else // parent
    {
      io_service.notify_fork(boost::asio::io_service::fork_parent);
      child_socket.close();
      boost::asio::spawn(io_service, 
          server(io_service, parent_socket, std::atoi(argv[1])));

      // Spawn additional threads.
      for (std::size_t i = 0; i < 3; ++i)
      {
        threads.create_thread(
          boost::bind(&boost::asio::io_service::run, &io_service));
      }
    }

    io_service.run();
    threads.join_all();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  // Now that the io_service and IO objects have been destroyed, all internal
  // Boost.Asio file descriptors have been closed, so the execl should be
  // in a clean state.  If the filename has been set, then exec touch.
  if (!filename.empty())
  {
    std::cout << "creating file: " << filename << std::endl;
    execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
  }
}

Terminal 1:

$ ls
a.out  example.cpp
$ ./a.out 12345
server is waiting for data
launcher is waiting for data
server got data: a
server is waiting for data
launcher is waiting for data
creating file: a
server got data: b
server is waiting for data
launcher is waiting for data
creating file: b
server got data: c
server is waiting for data
launcher is waiting for data
creating file: c
ctrl + c
$ ls
a  a.out  b  c  example.cpp

Terminal 2:

$ nc -u 127.0.0.1 12345
actrl + dbctrl + dcctrl + d

这篇关于如何在多线程程序中使用boost :: asio正确处理fork()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆