当boost :: asio :: io_service运行方法阻塞/解除阻塞时感到困惑 [英] Confused when boost::asio::io_service run method blocks/unblocks

查看:211
本文介绍了当boost :: asio :: io_service运行方法阻塞/解除阻塞时感到困惑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为Boost.Asio的初学者,我对 io_service :: run() 。我会感激,如果有人可以解释我,当这个方法阻止/解除封锁。文档说明:


run()功能阻塞,直到所有工作完成并且没有更多的处理程序要调度,或者直到 io_service 已停止。



多线程可以调用 run()函数设置一个线程池, io_service 可以从中执行处理程序。在池中等待的所有线程都是等效的, io_service 可以选择任何一个线程来调用处理程序。



正常退出 run()函数意味着 io_service 对象已停止( stopped()函数返回true)。随后调用 run() run_one() poll() poll_one()将立即返回,除非事先调用 reset()


以下语句是什么意思?


[...]







试图了解 io_service :: run()的行为,我遇到了这个(实施例3a)。在其中,我观察到 io_service-> run()阻塞并等待工作订单。

  // WorkerThread invines io_service-> run()
void WorkerThread(boost :: shared_ptr< boost :: asio :: io_service> io_service);
void CalculateFib(size_t);

boost :: shared_ptr< boost :: asio :: io_service> io_service(
new boost :: asio :: io_service);
boost :: shared_ptr& boost :: asio :: io_service :: work> work(
new boost :: asio :: io_service :: work(* io_service));

// ...

boost :: thread_group worker_threads;
for(int x = 0; x <2; ++ x)
{
worker_threads.create_thread(boost :: bind(& WorkerThread,io_service));
}

io_service-> post(boost :: bind(CalculateFib,3));
io_service-> post(boost :: bind(CalculateFib,4));
io_service-> post(boost :: bind(CalculateFib,5));

work.reset();
worker_threads.join_all();

然而,在我正在处理的下面的代码中,客户端使用TCP / IP和运行方法块,直到数据被异步接收。

  typedef boost :: asio :: ip :: tcp tcp; 
boost :: shared_ptr< boost :: asio :: io_service> io_service(
new boost :: asio :: io_service);
boost :: shared_ptr< tcp :: socket> socket(new tcp :: socket(* io_service));

//连接到127.0.0.1:9100。
tcp :: resolver resolver(* io_service);
tcp :: resolver :: query query(127.0.0.1,
boost :: lexical_cast< std :: string>(9100));
tcp :: resolver :: iterator endpoint_iterator = resolver.resolve(query);
socket-> connect(endpoint_iterator-> endpoint());

//阻塞这里直到接收到消息。
socket-> async_receive(boost :: asio :: buffer(buf_client,3000),0,
ClientReceiveEvent);
io_service-> run();

//写响应。
boost :: system :: error_code ignored_error;
std :: cout<< 发送邮件\\\
;
boost :: asio :: write(* socket,boost :: asio :: buffer(some data),ignored_error);

描述 run()

基金会



让我们从一个简化的例子开始,并检查相关的Boost.Asio部分:

  void handle_async_receive(...){...} 
void print(){...}

...

boost :: asio :: io_service io_service;
boost :: asio :: ip :: tcp :: socket socket(io_service);

...

io_service.post(& print); // 1
socket.connect(endpoint); // 2
socket.async_receive(buffer,& handle_async_receive); // 3
io_service.post(& print); // 4
io_service.run(); // 5



什么是处理程序?



一个处理程序只不过是一个回调。在示例代码中,有3个处理程序:




  • print

  • handle_async_receive 处理程序(3)。

  • print c> print() c>

    函数使用两次,每次使用都被认为创建自己的唯一可识别的处理程序。处理程序可以有多种形状和大小,从基本功能(如上面的)到更复杂的结构,例如从 boost :: bind()和lambdas生成的函子。



    什么是工作?



    工作是代表应用程序代码请求Boost.Asio执行的一些处理。有时Boost.Asio可能会在它被告知后立即开始一些工作,有时候它可能会等待稍后的时间进行工作。一旦完成工作,Boost.Asio将通过调用所提供的处理程序通知应用程序。



    Boost.Asio保证 只会在当前正在调用 run() run_one() code> poll()或 poll_one()。这些是将工作和调用处理程序的线程。因此,在上面的例子中, print()在被发布到 io_service (1)中时不会被调用。相反,它被添加到 io_service ,并且将在稍后的时间点被调用。在这种情况下,它在 io_service.run()(5)。



    什么是异步操作? h3>

    异步操作创建工作,Boost.Asio将调用处理程序以在工作完成时通知应用程序。通过调用具有前缀 async _ 的名称的函数来创建异步操作。这些函数也称为启动函数



    异步操作,可以分解为三个独特的步骤:




    • 启动或通知相关的 io_service 需要完成工作。 async_receive 操作(3)通知 io_service ,它需要异步读取套接字中的数据,然后 async_receive 立即返回。

    • 执行实际工作。在这种情况下,当 socket 接收数据时,字节将被读取并复制到 buffer 中。实际工作将在以下任一种情况下完成:

      • 启动函数(3),如果Boost.Asio可以确定它不会阻塞。

      • 当应用程式明确执行 io_service (5)。


    • 调用 handle_async_receive ReadHandler 。再次,处理程序仅在运行 io_service 的线程中调用。因此,无论何时完成工作(3或5),都会保证 handle_async_receive()仅在 io_service.run )(5)。



    这三个步骤之间的时间和空间分隔称为控制流反演。这是使异步编程困难的复杂性之一。但是,有一些技术可以帮助缓解这种情况,例如使用 coroutines



    io_service.run()做什么?



    当线程调用 io_service.run()时,工作和处理程序将从此线程中调用。在上面的例子中, io_service.run()(5)将阻塞,直到:




    • 它从 print 处理程序中调用并返回,接收操作成功或失败完成,其 handle_async_receive

    • io_service 已通过 io_service :: stop()

    • 在处理程序内抛出异常。



    一个潜在的伪流程可以描述为如下:

     
    create io_service
    create socket
    添加打印处理程序到io_service(1)
    wait用于套接字连接(2)
    向io_service(3)添加异步读工作请求
    将打印处理程序添加到io_service(4)
    运行io_service(5)
    有工作还是处理?
    是的,有1个工作和2个处理程序
    是套接字有数据吗? no,do nothing
    运行打印处理程序(1)
    是有工作还是处理程序?
    是的,有1个工作和1个处理程序
    是socket有数据吗? no,do nothing
    运行打印处理程序(4)
    是有工作还是处理程序?
    是的,有1个工作
    是socket有数据吗?没有,继续等待
    - 套接字接收数据 -
    套接字有数据,读入缓冲区
    添加handle_async_receive处理程序io_service
    是有工作还是处理程序?
    是的,有1个处理程序
    run handle_async_receive handler(3)
    是有工作还是处理程序?
    no,将io_service设置为停止并返回

    注意读取操作完成后,如何将另一个 io_service 。这个细微的细节是异步编程的一个重要特征。它允许处理程序链接在一起。例如,如果 handle_async_receive 没有获得它所期望的所有数据,那么它的实现可以发布另一个异步读取操作,导致 io_service 有更多的工作,因此不会从 io_service.run()返回。



    io_service 已停止工作,应用程序必须 reset() io_service






    示例问题和示例3a代码



    检查问题中引用的两段代码。



    问题代码



    socket - > async_receive 将工作添加到 io_service 。因此, io_service-> run()将阻塞,直到读取操作成功或错误完成,并且 ClientReceiveEvent 已完成运行或抛出异常。



    示例3a 代码



    为了更容易理解,较小的注释示例3a:

      void CalculateFib(std :: size_t n); 

    int main()
    {
    boost :: asio :: io_service io_service;
    boost :: optional< boost :: asio :: io_service :: work> work = //'。 1
    boost :: in_place(boost :: ref(io_service)); //。'

    boost :: thread_group worker_threads; // - 。
    for(int x = 0; x <2; ++ x)//:
    {//'。
    worker_threads.create_thread(//: - 2
    boost :: bind(& boost :: asio :: io_service :: run,& io_service)//。'
    ); //:
    } // - '

    io_service.post(boost :: bind(CalculateFib,3)); //。
    io_service.post(boost :: bind(CalculateFib,4)); //: - 3
    io_service.post(boost :: bind(CalculateFib,5)); //。'

    work = boost :: none; // 4
    worker_threads.join_all(); // 5
    }



    在高层次,程序将创建2个线程将处理 io_service 的事件循环(2)。这导致一个简单的线程池,将计算斐波那契数字(3)。



    问题代码和这个代码之间的一个主要区别是这个代码调用 io_service :: run()(2)之前实际工作和处理程序添加到 io_service 3)。为了防止 io_service :: run()立即返回, io_service :: work 对象被创建(1)。此对象阻止 io_service 停止工作;因此, io_service :: run()将不会作为无效工作的结果返回。



    如下所示:


    1. 创建并添加 io_service :: work 对象 io_service

    2. 创建线程池调用 io_service :: run() 。由于 io_service :: work 对象,这些工作线程不会从 io_service 返回。

    3. io_service 添加3个计算斐波那契数的处理程序,并立即返回。

    4. 删除 io_service :: work 对象。

    5. 等待工作线程完成运行。这将只有在所有3个处理程序都执行完成后才会发生,因为 io_service 既没有处理程序也没有工作。

    代码可以以与原始代码相同的方式编写,其中处理程序添加到 io_service ,然后 io_service 事件循环。这消除了使用 io_service :: work 的需要,并产生以下代码:

      int main()
    {
    boost :: asio :: io_service io_service;

    io_service.post(boost :: bind(CalculateFib,3)); //。
    io_service.post(boost :: bind(CalculateFib,4)); //: - 3
    io_service.post(boost :: bind(CalculateFib,5)); //。'

    boost :: thread_group worker_threads; // - 。
    for(int x = 0; x <2; ++ x)//:
    {//'。
    worker_threads.create_thread(//: - 2
    boost :: bind(& boost :: asio :: io_service :: run,& io_service)//。'
    ); //:
    } // - '
    worker_threads.join_all(); // 5
    }






    。异步



    虽然问题中的代码使用了异步操作,但是它正在等待异步操作完成,所以它有效地同步运行:

      socket.async_receive(buffer,handler)
    io_service.run

    等效于:

      boost :: asio :: error_code error; 
    std :: size_t bytes_transferred = socket.receive(buffer,0,error);
    handler(error,bytes_transferred);

    作为一般的经验法则,尽量避免混合同步和异步操作。通常,它可以将一个复杂的系统变成一个复杂的系统。此 answer 强调了异步编程的优点,其中一些也包含在Boost.Asio 文档


    Being a total beginner to Boost.Asio, I am confused with io_service::run(). I would appreciate it if someone could explain to me when this method blocks/unblocks. The documentations states:

    The run() function blocks until all work has finished and there are no more handlers to be dispatched, or until the io_service has been stopped.

    Multiple threads may call the run() function to set up a pool of threads from which the io_service may execute handlers. All threads that are waiting in the pool are equivalent and the io_service may choose any one of them to invoke a handler.

    A normal exit from the run() function implies that the io_service object is stopped (the stopped() function returns true). Subsequent calls to run(), run_one(), poll() or poll_one() will return immediately unless there is a prior call to reset().

    What does the following statement mean?

    [...] no more handlers to be dispatched [...]


    While trying to understand the behavior of io_service::run(), I came across this example (example 3a). Within it, I observe that io_service->run() blocks and waits for work orders.

    // WorkerThread invines io_service->run()
    void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
    void CalculateFib(size_t);
    
    boost::shared_ptr<boost::asio::io_service> io_service(
        new boost::asio::io_service);
    boost::shared_ptr<boost::asio::io_service::work> work(
       new boost::asio::io_service::work(*io_service));
    
    // ...
    
    boost::thread_group worker_threads;
    for(int x = 0; x < 2; ++x)
    {
      worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
    }
    
    io_service->post( boost::bind(CalculateFib, 3));
    io_service->post( boost::bind(CalculateFib, 4));
    io_service->post( boost::bind(CalculateFib, 5));
    
    work.reset();
    worker_threads.join_all();
    

    However, in the following code that I was working on, the client connects using TCP/IP and the run method blocks until data is asynchronously received.

    typedef boost::asio::ip::tcp tcp;
    boost::shared_ptr<boost::asio::io_service> io_service(
        new boost::asio::io_service);
    boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));
    
    // Connect to 127.0.0.1:9100.
    tcp::resolver resolver(*io_service);
    tcp::resolver::query query("127.0.0.1", 
                               boost::lexical_cast< std::string >(9100));
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    socket->connect(endpoint_iterator->endpoint());
    
    // Just blocks here until a message is received.
    socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                          ClientReceiveEvent);
    io_service->run();
    
    // Write response.
    boost::system::error_code ignored_error;
    std::cout << "Sending message \n";
    boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);
    

    Any explanation of run() that describes its behavior in the two examples below would be appreciated.

    解决方案

    Foundation

    Lets start with a simplified example and examine the relevant Boost.Asio pieces:

    void handle_async_receive(...) { ... }
    void print() { ... }
    
    ...  
    
    boost::asio::io_service io_service;
    boost::asio::ip::tcp::socket socket(io_service);
    
    ...
    
    io_service.post(&print);                             // 1
    socket.connect(endpoint);                            // 2
    socket.async_receive(buffer, &handle_async_receive); // 3
    io_service.post(&print);                             // 4
    io_service.run();                                    // 5
    

    What Is A Handler?

    A handler is nothing more than a callback. In the example code, there are 3 handlers:

    • The print handler (1).
    • The handle_async_receive handler (3).
    • The print handler (4).

    Even though the same print() function is used twice, each use is considered to create its own uniquely identifiable handler. Handlers can come in many shapes and sizes, ranging from basic functions like the ones above to more complex constructs such as functors generated from boost::bind() and lambdas. Regardless of the complexity, the handler still remains nothing more than a callback.

    What Is Work?

    Work is some processing that Boost.Asio has been requested to do on behalf of the application code. Sometimes Boost.Asio may start some of the work as soon as it has been told about it, and other times it may wait to do the work at a later point in time. Once it has finished the work, Boost.Asio will inform the application by invoking the supplied handler.

    Boost.Asio guarantees that handlers will only run within a thread that is currently calling run(), run_one(), poll(), or poll_one(). These are the threads that will do work and call handlers. Therefore, in above example, print() is not invoked when it is posted into the io_service (1). Instead, it is added to the io_service and will be invoked at a later point in time. In this case, it within io_service.run() (5).

    What Are Asynchronous Operations?

    An asynchronous operation creates work and Boost.Asio will invoke a handler to inform the application when the work has completed. Asynchronous operations are created by calling a function that has a name with the prefix async_. These functions are also known as initiating functions.

    Asynchronous operations and can be decomposed into three unique steps:

    • Initiating, or informing, the associated io_service that works needs to be done. The async_receive operation (3) informs the io_service that it will need to asynchronously read data from the socket, then async_receive returns immediately.
    • Doing the actual work. In this case, when socket receives data, bytes will be read and copied into buffer. The actual work will be done in either:
      • The initiating function (3), if Boost.Asio can determine that it will not block.
      • When the application explicitly run the io_service (5).
    • Invoking the handle_async_receive ReadHandler. Once again, handlers are only invoked within threads running the io_service. Thus, regardless of when the work is done (3 or 5), it is guaranteed that handle_async_receive() will only be invoked within io_service.run() (5).

    The separation in time and space between these three steps is known as control flow inversion. It is one of the complexities that makes asynchronous programming difficult. However, there are techniques that can help mitigate this, such as by using coroutines.

    What Does io_service.run() Do?

    When a thread calls io_service.run(), work and handlers will be invoked from within this thread. In the above example, io_service.run() (5) will block until:

    • It has invoked and returned from both print handlers, the receive operation completes with success or failure, and its handle_async_receive handler has been invoked and returned.
    • The io_service is explicitly stopped via io_service::stop().
    • An exception is thrown from within a handler.

    One potential psuedo-ish flow could be described as the following:

    create io_service
    create socket
    add print handler to io_service (1)
    wait for socket to connect (2)
    add an asynchronous read work request to the io_service (3)
    add print handler to io_service (4)
    run the io_service (5)
      is there work or handlers?
        yes, there is 1 work and 2 handlers
          does socket have data? no, do nothing
          run print handler (1)
      is there work or handlers?
        yes, there is 1 work and 1 handler
          does socket have data? no, do nothing
          run print handler (4)
      is there work or handlers?
        yes, there is 1 work
          does socket have data? no, continue waiting
      -- socket receives data --
          socket has data, read it into buffer
          add handle_async_receive handler to io_service
      is there work or handlers?
        yes, there is 1 handler
          run handle_async_receive handler (3)
      is there work or handlers?
        no, set io_service as stopped and return

    Notice how when the read finished, it added another handler to the io_service. This subtle detail is an important feature of asynchronous programming. It allows for handlers to be chained together. For instance, if handle_async_receive did not get all the data it expected, then its implementation could post another asynchronous read operation, resulting in io_service having more work, and thus not returning from io_service.run().

    Do note that when the io_service has ran out of work, the application must reset() the io_service before running it again.


    Example Question and Example 3a code

    Now, lets examine the two pieces of code referenced in the question.

    Question Code

    socket->async_receive adds work to the io_service. Thus, io_service->run() will block until the read operation completes with success or error, and ClientReceiveEvent has either finished running or throws an exception.

    Example 3a Code

    In hopes of making it easier to understand, here is a smaller annotated Example 3a:

    void CalculateFib(std::size_t n);
    
    int main()
    {
      boost::asio::io_service io_service;
      boost::optional<boost::asio::io_service::work> work =       // '. 1
          boost::in_place(boost::ref(io_service));                // .'
    
      boost::thread_group worker_threads;                         // -.
      for(int x = 0; x < 2; ++x)                                  //   :
      {                                                           //   '.
        worker_threads.create_thread(                             //     :- 2
          boost::bind(&boost::asio::io_service::run, &io_service) //   .'
        );                                                        //   :
      }                                                           // -'
    
      io_service.post(boost::bind(CalculateFib, 3));              // '.
      io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
      io_service.post(boost::bind(CalculateFib, 5));              // .'
    
      work = boost::none;                                         // 4
      worker_threads.join_all();                                  // 5
    }
    

    At a high-level, the program will create 2 threads that will process the io_service's event loop (2). This results in a simple thread pool that will calculate Fibonacci numbers (3).

    The one major difference between the Question Code and this code is that this code invokes io_service::run() (2) before actual work and handlers are added to the io_service (3). To prevent the io_service::run() from returning immediately, an io_service::work object is created (1). This object prevents the io_service from running out of work; therefore, io_service::run() will not return as a result of no work.

    The overall flow is as follows:

    1. Create and add the io_service::work object added to the io_service.
    2. Thread pool created that invokes io_service::run(). These worker threads will not return from io_service because of the io_service::work object.
    3. Add 3 handlers that calculate Fibonacci numbers to the io_service, and return immediately. The worker threads, not the main thread, may start running these handlers immediately.
    4. Delete the io_service::work object.
    5. Wait for worker threads to finish running. This will only occur once all 3 handlers have finished execution, as the io_service neither has handlers nor work.

    The code could be written differently, in the same manner as the Original Code, where handlers are added to the io_service, and then the io_service event loop is processed. This removes the need to use io_service::work, and results in the following code:

    int main()
    {
      boost::asio::io_service io_service;
    
      io_service.post(boost::bind(CalculateFib, 3));              // '.
      io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
      io_service.post(boost::bind(CalculateFib, 5));              // .'
    
      boost::thread_group worker_threads;                         // -.
      for(int x = 0; x < 2; ++x)                                  //   :
      {                                                           //   '.
        worker_threads.create_thread(                             //     :- 2
          boost::bind(&boost::asio::io_service::run, &io_service) //   .'
        );                                                        //   :
      }                                                           // -'
      worker_threads.join_all();                                  // 5
    }
    


    Synchronous vs. Asynchronous

    Although the code in the question is using an asynchronous operation, it is effectively functioning synchronously, as it is waiting for the asynchronous operation to complete:

    socket.async_receive(buffer, handler)
    io_service.run();
    

    is equivalent to:

    boost::asio::error_code error;
    std::size_t bytes_transferred = socket.receive(buffer, 0, error);
    handler(error, bytes_transferred);
    

    As a general rule of thumb, try to avoid mixing synchronous and asynchronous operations. Often times, it can turn a complex system into a complicated system. This answer highlights advantages of asynchronous programming, some of which are also covered in the Boost.Asio documentation.

    这篇关于当boost :: asio :: io_service运行方法阻塞/解除阻塞时感到困惑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆