ZeroMQ处理多线程应用程序中的中断 [英] ZeroMQ handling interrupt in multithreaded application

查看:83
本文介绍了ZeroMQ处理多线程应用程序中的中断的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在多线程环境中ZeroMQ中的正常退出

Graceful exit in ZeroMQ in multithreaded environment

规格:带有c ++ 11,libzmq的ubuntu 16.04:4.2.3

Specs : ubuntu 16.04 with c++11,libzmq : 4.2.3

示例代码

static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
    s_interrupted = 1;
    //some code which will tell main thread to exit
}

static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}

static void Thread((zsock_t *pipe, void *)
{
    zmq::context_t context(1);
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    requester1.connect(address1);
    requester2.connect(address2);
    zmq_pollitem_t items []=
        {{requester1,0,ZMQ_POLLIN,0},
        {requester2,0,ZMQ_POLLIN,0}};

    while(true)
    {
        zmq::message_t message;
        zmq::poll (items, 2, -1);

         if (items [0].revents & ZMQ_POLLIN) 
         {
             requester1.recv(&message);
         }
         if (items [1].revents & ZMQ_POLLIN) 
         {
             requester2.recv(&message);
         }
    }
}

int main()
{
    .
    //some code
    .

    zactor_t *actor = zactor_new (Threaded, nullptr);
    s_catch_signals();
    .
    //continue
    .
    //wait till thread finishes to exit

    return 0;
}

现在,当发生中断时,它将从主线程调用信号处理程序.我不知何故需要告诉线程(轮询器)从信号处理程序中退出.有什么想法可以实现这一目标吗?

Now when the interrupt occurs it will call the signal handler from the main thread. I somehow need to tell the thread (poller) to exit from the signal handler. Any ideas how to achieve this?

推荐答案

从ZMQ文档中,您可以通过2种惯用的"方式来解决此问题:

From ZMQ documentation you have 2 "idiomatic" way of dealing with this :

  • Polling on a pipe, and writing on the pipe in the signal handler.

在发送信号时在recv中引发捕获异常.

测试之后,似乎zmq :: poll不会在SIGINT上引发异常.因此,解决方案似乎是使用专用于关闭的套接字.解决方案如下:

After testing it, seems that zmq::poll does not throw an exception on SIGINT. Therefore the solution seem to be to use a socket dedicated to closing. The solution looks like this :

#include <iostream>
#include <thread>
#include <signal.h>
#include <zmq.hpp>
zmq::context_t* ctx;
static void s_signal_handler (int signal_value)
{
    std::cout << "Signal received" << std::endl;
    zmq::socket_t stop_socket(*ctx, ZMQ_PAIR);
    stop_socket.connect("inproc://stop_address");
    zmq::message_t msg("0", 1);
    stop_socket.send(msg);
    std::cout << "end sighandler" << std::endl;
}

static void s_catch_signals (void)
{
    struct sigaction action;
    action.sa_handler = s_signal_handler;
    action.sa_flags = 0;
    sigemptyset (&action.sa_mask);
    sigaction (SIGINT, &action, NULL);
    sigaction (SIGTERM, &action, NULL);
}

void thread(void)
{
    std::cout << "Thread Begin" << std::endl;
    zmq::context_t context (1);
    ctx = &context;
    zmq::socket_t requester1(context,ZMQ_DEALER);
    zmq::socket_t requester2(context,ZMQ_DEALER);
    zmq::socket_t stop_socket(context, ZMQ_PAIR);
    requester1.connect("tcp://127.0.0.1:36483");
    requester2.connect("tcp://127.0.0.1:36483");
    stop_socket.bind("inproc://stop_address");

    zmq_pollitem_t items []=
    {
        {requester1,0,ZMQ_POLLIN,0},
        {requester2,0,ZMQ_POLLIN,0},
        {stop_socket,0,ZMQ_POLLIN,0}
    };

    while ( true )
    {
        //  Blocking read will throw on a signal

        int rc = 0;
        std::cout << "Polling" << std::endl;
        rc = zmq::poll (items, 3, -1);

        zmq::message_t message;
        if(rc > 0)
        {
            if (items [0].revents & ZMQ_POLLIN)
            {
                requester1.recv(&message);
            }
            if (items [1].revents & ZMQ_POLLIN)
            {
                requester2.recv(&message);
            }
            if(items [2].revents & ZMQ_POLLIN)
            {
                std::cout << "message stop received " << std::endl;
                break;
            }
        }
    }
    requester1.setsockopt(ZMQ_LINGER, 0);
    requester2.setsockopt(ZMQ_LINGER, 0);
    stop_socket.setsockopt(ZMQ_LINGER, 0);
    requester1.close();
    requester2.close();
    stop_socket.close();
    std::cout << "Thread end" << std::endl;
}

int main(void)
{
    std::cout << "Begin" << std::endl;
    s_catch_signals ();
    zmq::context_t context (1);
    zmq::socket_t router(context,ZMQ_ROUTER);
    router.bind("tcp://127.0.0.1:36483");

    std::thread t(&thread);
    t.join();
    std::cout << "end join" << std::endl;

}

请注意,如果您不想与信号处理程序共享上下文,则可以使用"ipc://...".

Note that if you do not want to share the context to the signal handler you could use "ipc://..." .

这篇关于ZeroMQ处理多线程应用程序中的中断的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆