Boost与GUI和工作者螺纹的Asio样式 [英] Boost Asio pattern with GUI and worker thread

查看:136
本文介绍了Boost与GUI和工作者螺纹的Asio样式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想实现一个Boost Asio模式,使用一个线程的GUI和工作线程的一些socket IO。



工作线程将使用 boost :: asio :: io_service 来管理套接字客户端。



GUI线程需要从工作线程发送和接收消息。



我无法确切地说明如何使用Boost Asio实现这种模式。



我已经实现了标准的Asio方式的套接字通信(我从工作线程调用 io_service.run(),我使用 async_read_some / async_send )。我不需要 strands ,因为 io_service.run()只从工作线程调用。



现在我想添加交叉线程消息队列。



我应该运行 io_service c> c>

post 将消息从GUI线程发布到工作线程(不调用 io_service.run() io_service.poll_one ()从GUI线程),并使用操作系统的GUI消息循环将消息从工作线程发布到GUI线程?



如果我需要从GUI线程调用 io_service.run() io_service.poll_one()因为 io_service 在两个线程之间共享,所以需要对套接字操作使用 strands



编辑:为了澄清我的问题,我想尽一切可能,使用Boost Asio实现消息队列,依靠其他库只有当Boost Asio不能作业。

解决方案

消息传递是相当通用的。有各种方法来处理该问题,并且该解决方案将可能取决于期望的行为细节。例如,阻塞或非阻塞,控制内存分配,上下文等。




  • Boost.Lockfree 为单/多消费者/生产者提供了线程安全的无锁无阻塞队列。它倾向于相当好地适用于事件循环,这对于消费者被阻塞是不理想的,等待生产者用信号通知同步结构。

      boost :: lockfree :: queue< message_type> worker_message_queue; 

    void send_worker_message(const message_type& message)
    {
    //将消息添加到工作程序消息队列。
    worker_message_queue.push(message);

    //将工作添加到将处理队列的worker_io_service。
    worker_io_service.post(& process_message);
    }

    void process_message()
    {
    message_type message;

    //如果未检索到消息,则早退。
    if(!worker_message_queue.pop(message))return;

    ...
    }


  • Boost.Asio io_service 可以作为队列。消息只需要绑定到指定的处理程序。

      void send_worker_message(const message_type& message)
    {
    //将工作添加到将处理消息的worker_io_service。
    worker_io_service.post(boost :: bind(& process_message,message));
    }

    void process_message(message_type& message)
    {
    ...
    }
    pre>






这个评论表明,欲望不仅仅是消息传递。它听起来好像最终目标是允许一个线程引起另一个线程调用任意函数。



如果是这种情况,请考虑:




  • 使用 Boost。 Signals2 用于管理信号和时隙实现。

  • 使用Boost.Asio的 io_service 设置信号发射。如果GUI线程和工作线程都有自己的 io_service ,那么工作线程可以在GUI线程的 io_service 将发出一个信号。在GUI线程的主循环中,它将轮询 io_service ,发出信号,并导致在GUI线程的上下文中调用槽。



下面是两个线程将消息(如 unsigned int )传递给另一个消息的完整示例以及在另一个线程中调用任意函数。

  #include< iostream> 
#include< boost / asio.hpp>
#include< boost / bind.hpp>
#include< boost / signals2.hpp>
#include< boost / thread.hpp>

/// @brief io_service专用于gui。
boost :: asio :: io_service gui_service;

/// @brief io_service献给worker。
boost :: asio :: io_service worker_service;

/// @brief工作以防止gui_service过早停止。
boost :: optional< boost :: asio :: io_service :: work> ;

/// @brief hello slot。
void hello(int x)
{
std :: cout< hello with<< x < 从线程<
boost :: this_thread :: get_id()< std :: endl;
}

/// @brief world slot。
void world(int x)
{
std :: cout< world with<< x < 从线程<
boost :: this_thread :: get_id()<< std :: endl;
}

/// @brief信号类型。
typedef boost :: signals2 :: signal< void(int)> signal_type;

void emit_then_notify_gui(signal_type& signal,unsigned int x);

/// @brief发出信号,然后消息工作者。
void emit_then_notify_worker(signal_type& signal,unsigned int x)
{
//发出信号,导致已注册的槽在此线程中运行。
signal(x);

//如果x已用尽,那么会导致gui服务用完。
if(!x)
{
gui_work = boost :: none;
}
//否则,后工作到工作服务。
else
{
std :: cout< GUI线程:< boost :: this_thread :: get_id()<<
调度其他线程发出信号< std :: endl;
worker_service.post(boost :: bind(
& emit_then_notify_gui,
boost :: ref(signal),--x));
}
}

/// @brief发出信号然后消息工作者。
void emit_then_notify_gui(signal_type& signal,unsigned int x)
{
//发出信号,导致已注册的槽在这个线程中运行。
signal(x);

//如果x已用尽,那么会导致gui服务用完。
if(!x)
{
gui_work = boost :: none;
}
//否则,更多的工作到gui。
else
{
std :: cout< Worker thread:< boost :: this_thread :: get_id()<<
调度其他线程发出信号< std :: endl;
gui_service.post(boost :: bind(
& emit_then_notify_worker,
boost :: ref(signal),--x));
}
}

void worker_main()
{
std :: cout< Worker thread:< boost :: this_thread :: get_id()< std :: endl;
worker_service.run();
}

int main()
{
signal_type signal;

//将插槽连接到信号。
signal.connect(& hello);
signal.connect(& world);

boost :: optional< boost :: asio :: io_service :: work> worker_work(
boost :: ref(worker_service));
gui_work = boost :: in_place(boost :: ref(gui_service));

std :: cout<< GUI线程:< boost :: this_thread :: get_id()<< std :: endl;

//生成工作线程。
boost :: thread worker_thread(& worker_main);

//将工作添加到worker。
worker_service.post(boost :: bind(
& emit_then_notify_gui,
boost :: ref(signal),3));

//模拟GUI主循环。
while(!gui_service.stopped())
{
//执行其他GUI操作。

//执行消息处理。
gui_service.poll_one();
}

//清理。
worker_work = boost :: none;
worker_thread.join();
}

及其输出:

 GUI线程:b7f2f6d0 
工作线程:b7f2eb90
hello与3从线程b7f2eb90
世界与3从线程b7f2eb90
工作线程:b7f2eb90调度其他线程发出信号
hello与2从线程b7f2f6d0
世界与2从线程b7f2f6d0
GUI线程:b7f2f6d0调度其他线程发出信号
hello与1从线程b7f2eb90
世界1从线程b7f2eb90
工作线程:b7f2eb90调度其他线程发出信号
hello与0从线程b7f2f6d0
世界与0从线程b7f2f6d0


I would like to implement a Boost Asio pattern using a thread for GUI and a worker thread for some socket IO.

The worker thread will use boost::asio::io_service to manage a socket client. All operations on sockets will be performed by the worker thread only.

The GUI thread needs to send and receive messages from the worker thread.

I can't exactly figure how to implement this pattern using Boost Asio.

I've already implemented the socket communication in the standard Asio way (I call io_service.run() from the worker thread and I use async_read_some/async_send). I don't need strands because io_service.run() is called from the worker thread only.

Now I'm trying to add the cross thread message queue. How I can I implement it?

Should I run the io_service from the GUI thread too?

Or should I just use strands with post to post messages from the GUI thread to the worker thread (without calling io_service.run() or io_service.poll_one() from the GUI thread), and use the operating system's GUI message loop to post messages from the worker thread to the GUI thread?

If I need to call io_service.run() or io_service.poll_one() from the GUI thread too, do I need to use strands on the socket operations, since the io_service is shared between two threads?

EDIT: to clarify my question, I would like to do whatever I can, to implement the message queue, using Boost Asio, relying on other libraries only if Boost Asio can't do the job.

解决方案

Message passing is fairly generic. There are various ways to approach the problem, and the solution will likely be dependent on the desired behavioral details. For example, blocking or non-blocking, controlling memory allocation, context, etc.

  • Boost.Lockfree provides thread-safe lock-free non-blocking queues for singe/multi consumer/producers. It tends to lend itself fairly nicely to event loops, where it is not ideal for the consumer to be blocked, waiting for the producer to signal a synchronization construct.

    boost::lockfree::queue<message_type> worker_message_queue;
    
    void send_worker_message(const message_type& message)
    {
      // Add message to worker message queue.
      worker_message_queue.push(message);
    
      // Add work to worker_io_service that will process the queue.
      worker_io_service.post(&process_message); 
    }
    
    void process_message()
    {
      message_type message;
    
      // If the message was not retrieved, then return early.
      if (!worker_message_queue.pop(message)) return;
    
      ...
    }
    

  • Alternatively, Boost.Asio's io_service can function as a queue. The message just needs to be bound to the specified handler.

    void send_worker_message(const message_type& message)
    {
      // Add work to worker_io_service that will process the message.
      worker_io_service.post(boost::bind(&process_message, message)); 
    }
    
    void process_message(message_type& message)
    {
      ...
    }
    


This comment suggest that the desire is more than message passing. It sounds as though the end goal is to allow one thread to cause another thread to invoke arbitrary functions.

If this is the case, then consider:

  • Using Boost.Signals2 for a managed signals and slots implementation. This allows arbitrary functions to register with a signal.
  • Using Boost.Asio's io_service to setup signal emissions. If the GUI thread and worker thread each have their own io_service, then the worker thread can post a handler into the GUI thread's io_service that will emit a signal. In the GUI thread's main loop, it will poll the io_service, emit the signal, and cause slots to be invoked from within the GUI thread's context.

Here is complete example where two threads pass a message (as an unsigned int) to one another, as well as causing arbitrary functions to be invoked within another thread.

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/signals2.hpp>
#include <boost/thread.hpp>

/// @brief io_service dedicated to gui.
boost::asio::io_service gui_service;

/// @brief io_service dedicated to worker.
boost::asio::io_service worker_service;

/// @brief work to keep gui_service from stopping prematurely.
boost::optional<boost::asio::io_service::work> gui_work;

/// @brief hello slot.
void hello(int x)
{
  std::cout << "hello with " << x << " from thread " << 
               boost::this_thread::get_id() << std::endl;
}

/// @brief world slot.
void world(int x)
{
  std::cout << "world with " << x << " from thread " << 
               boost::this_thread::get_id() << std::endl;
}

/// @brief Type for signals.
typedef boost::signals2::signal<void (int)> signal_type;

void emit_then_notify_gui(signal_type& signal, unsigned int x);

/// @brief Emit signals then message worker.
void emit_then_notify_worker(signal_type& signal, unsigned int x)
{
  // Emit signal, causing registered slots to run within this thread.
  signal(x);

  // If x has been exhausted, then cause gui service to run out of work.
  if (!x)
  {
    gui_work = boost::none;
  }
  // Otherwise, post work into worker service.
  else
  {
    std::cout << "GUI thread: " << boost::this_thread::get_id() << 
                 " scheduling other thread to emit signals" << std::endl;
    worker_service.post(boost::bind(
        &emit_then_notify_gui,
        boost::ref(signal), --x));
  }  
}

/// @brief Emit signals then message worker.
void emit_then_notify_gui(signal_type& signal, unsigned int x)
{
  // Emit signal, causing registered slots to run within this thread.
  signal(x);

  // If x has been exhausted, then cause gui service to run out of work.
  if (!x)
  {
    gui_work = boost::none;
  }
  // Otherwise, post more work into gui.
  else
  {
    std::cout << "Worker thread: " << boost::this_thread::get_id() << 
                 " scheduling other thread to emit signals" << std::endl;
    gui_service.post(boost::bind(
        &emit_then_notify_worker,
        boost::ref(signal), --x));
  }  
}

void worker_main()
{
  std::cout << "Worker thread: " << boost::this_thread::get_id() << std::endl;
  worker_service.run();
}

int main()
{
  signal_type signal;

  // Connect slots to signal.
  signal.connect(&hello);
  signal.connect(&world);

  boost::optional<boost::asio::io_service::work> worker_work(
     boost::ref(worker_service));
  gui_work = boost::in_place(boost::ref(gui_service));

  std::cout << "GUI thread: " << boost::this_thread::get_id() << std::endl;

  // Spawn off worker thread.
  boost::thread worker_thread(&worker_main);

  // Add work to worker.
  worker_service.post(boost::bind(
      &emit_then_notify_gui,
      boost::ref(signal), 3));

  // Mocked up GUI main loop.
  while (!gui_service.stopped())
  {
    // Do other GUI actions.

    // Perform message processing.
    gui_service.poll_one();
  }

  // Cleanup.
  worker_work = boost::none;
  worker_thread.join();
}

And its output:

GUI thread: b7f2f6d0
Worker thread: b7f2eb90
hello with 3 from thread b7f2eb90
world with 3 from thread b7f2eb90
Worker thread: b7f2eb90 scheduling other thread to emit signals
hello with 2 from thread b7f2f6d0
world with 2 from thread b7f2f6d0
GUI thread: b7f2f6d0 scheduling other thread to emit signals
hello with 1 from thread b7f2eb90
world with 1 from thread b7f2eb90
Worker thread: b7f2eb90 scheduling other thread to emit signals
hello with 0 from thread b7f2f6d0
world with 0 from thread b7f2f6d0

这篇关于Boost与GUI和工作者螺纹的Asio样式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆