在多线程c ++ 0x11程序中使用zmq :: poll结合cntr + x或kill信号 [英] using zmq::poll in multithreaded c++0x11 program in combination with cntr +x or kill signal

查看:691
本文介绍了在多线程c ++ 0x11程序中使用zmq :: poll结合cntr + x或kill信号的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于自定义服务器,我打算使用 int zmq :: poll(zmq_pollitemt_t * items int nitems,long timeout = -1) 。函数,我认为是unix轮询函数的包装,但包括 zmq :: socket_t 旁边的文件描述符。该函数按我的预期工作,直到我按ctrl + x或在终端中运行 $ kill my_server_pid 。我希望调查以-1或者抛出一个 zmq :: error_t (它源于 std :: exception ),其中包括 errno strerr 消息。这应该表明有一个中断。然后我的服务器应该正常处理信号并保存一些数据并关闭。



下面我有一段代码演示了问题。首先我展示一些我的环境和如何编译它:

  mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ g ++ -v 
使用内置specs。
COLLECT_GCC = g ++
COLLECT_LTO_WRAPPER = / usr / lib / gcc / x86_64-linux-gnu / 4.6 / lto-wrapper
目标:x86_64-linux-gnu
配置为:。 ./src/configure -v --with-pkgversion ='Ubuntu / Linaro 4.6.3-1ubuntu5'--with-bugurl = file:///usr/share/doc/gcc-4.6/README.Bugs --enable -languages = c,c ++,fortran,objc,obj-c ++ --prefix = / usr --program-suffix = -4.6 --enable-shared --enable-linker-build-id --with-system-zlib - -libexecdir = / usr / lib --unhout-included-gettext --enable-threads = posix --with-gxx-include-dir = / usr / include / c ++ / 4.6 --libdir = / usr / lib --enable -nls --with-sysroot = / --enable-clocale = gnu --enable-libstdcxx-debug --enable-libstdcxx-time = yes --enable-gnu-unique-object --enable-plugin --enable- objc-gc --disable-werror --with-arch-32 = i686 --with-tune = generic --enable-checking = release --build = x86_64-linux-gnu -host = x86_64-linux-gnu - -target = x86_64-linux-gnu
线程模型:posix
gcc版本4.6.3(Ubuntu / Linaro 4.6.3-1ubuntu5)
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ pkg-config --modversion`输入代码here`libzmq
2.2.0
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ g ++ -pthread -std = c ++ 0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq)
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $

现在是polling.cpp的代码:

  #include< zmq.hpp> 
#include< thread>
#include< cstdlib>
#include< string>
#include< iostream>
#include< signal.h>

const char * bind_addres =tcp:// *:2345;
const char * connect_addres =tcp:// localhost:2345;

inline void
send_str(zmq :: socket_t& sock,const std :: string& s)throw(zmq :: error_t){
zmq :: message_t msg .size());
memcpy(msg.data(),s.c_str(),msg.size());
sock.send(msg);
}

inline void recv_str(zmq :: socket_t& sock,std :: string& s)throw(zmq :: error_t){
zmq :: message_t msg;
sock.recv(& msg);
s = std :: string(static_cast< const char *>(msg.data()),msg.size());
}
static int interrupted = 0;

static void handle_signal(int signal)
{
interrupted = signal;
std :: cerr<< 被信号中断:<信号<< std :: endl;
}

void catch_signals(void)
{
struct sigaction action;
action.sa_handler = handle_signal;
action.sa_flags = 0;
sigemptyset(& action.sa_mask);

sigaction(SIGINT,& action,NULL);
sigaction(SIGTERM,& action,NULL);
}

int get_interrupted(void)
{
std :: cout< interrupted =<中断< std :: endl;
return interrupted;
}

void req_thread(zmq :: context_t * c){


zmq :: socket_t sock(* c,ZMQ_REQ);
sock.connect(connect_addres);

zmq :: pollitem_t items [] {
{sock,0,ZMQ_POLLIN,0}
};

while(true){
try {
// zmq 3.x.x需要ms而不是us,所以改为1000或者耐心。
int rc = zmq :: poll(items,1,1000000);
if(rc> 0){
if(items [0] .revents& ZMQ_POLLIN){
std :: string s;
recv_str(sock,s);
std :: cout<< s < std :: endl;
}
}
else if(rc == 0){// timeout
send_str(sock,Hello);
}
else {
std :: cout< __func__<< < __LINE__<< get_interrupted()<< std :: endl;
}
}
catch(zmq :: error_t& e){
std :: cout< __func__<< < __LINE__<< e.what()< std :: endl;
}
}
}

void rep_thread(zmq :: context_t * c){

zmq :: socket_t sock ,ZMQ_REP);
sock.bind(bind_addres);

zmq :: pollitem_t items [] {
{sock,0,ZMQ_POLLIN,0}
};

while(true){
try {
int rc = zmq :: poll(items,1,-1);
if(rc> 0){
if(items [0] .revents& ZMQ_POLLIN){
std :: string s;
recv_str(sock,s);
s + =world!;
send_str(sock,s);
}
}
else {
std :: cout< __func__<< < __LINE__<< get_interrupted()<< std :: endl;
}
}
catch(zmq :: error_t& e){
std :: cout< __func__<< < __LINE__<< e.what()< std :: endl;
}
}
}

int main(){

zmq :: context_t context(1);
catch_signals();

std :: thread t1(rep_thread,& context);
std :: thread t2(req_thread,& context);

t1.join();
t2.join();

return 0;
}

,最后我将展示一些示例输出, $ c> zmq_poll 似乎没有受到在终端中按ctrl + c的影响:

  mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ ./poll 
Hello world!
Hello world!
Hello world!
^ CInterrupted by signal:2
Hello world!
Hello world!
^ Z
[1] +停止./poll
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ kill -9%1

[ 1] +已停止./poll
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $


$ b b

在这里可以看到终端上的输出没有zmq :: error_t被抛出,zmq :: poll()返回-1;



它应该工作是在下一个例子simple_poll.cpp可以看到一个zmq :: error_t被抛出:

 #include< zmq.hpp> 
#include< signal.h>
#include< iostream>


static int interrupted = 0;

static void handle_signal(int signal)
{
interrupted = signal;
/ *只是为了表明信号处理程序工作* /
std :: cerr<< 被信号中断:<信号<< std :: endl;
}

void catch_signals(void)
{
struct sigaction action;
action.sa_handler = handle_signal;
action.sa_flags = 0;
sigemptyset(& action.sa_mask);

sigaction(SIGINT,& action,NULL);
sigaction(SIGTERM,& action,NULL);
}

using namespace std;

int main(){

zmq :: context_t context(1);
catch_signals();
zmq :: socket_t sock(context,ZMQ_REP);
/ *在所有可用接口上的端口2346上监听。* /
sock.bind(tcp:// *:2346);

zmq :: pollitem_t items [] = {
{sock,0,ZMQ_POLLIN,0}
};

try {
/ *等待事件* /
zmq :: poll(items,1,-1);
/ *为zmq用户读取消息并响应* /
}
catch(zmq :: error_t& e){
cout< error occured:<< e.what()< endl
cout<< 我们被打断:<中断< endl
}
return 0;
}

这会在终端上在ctrl + x上产生以下结果,显示zmq :: error_t被捕获并且信号已被处理。

  mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ ./simple_poll ^ C中断的信号:2 
错误发生:中断的系统调用
我们中断了:2
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $


解决方案

您有一个信号处理程序 - 任何东西。在您的处理程序中,中断轮询循环(而不是 true ,检查您在信号处理程序中设置的某些条件。)



让我们说为了参数的缘故,你使用的是c ++ 11,尝试像...。

 表示我们正在运行.. 
std :: atomic< bool> running = true;


//在您的处理程序中 - 重置此标志
static void handle_signal(int signal){
running = false;
}

现在你的循环变成了:

  while(running)
{

}

For a custom server I intent to use the int zmq::poll( zmq_pollitemt_t * items, int nitems, long timeout = -1). function which is I think is a wrapper around the unix poll function but includes zmq::socket_t next to file descriptors. The function works as I expected until I press ctrl+x or run $kill my_server_pid in the terminal. I would expect that the poll to terminate with -1 or throws a zmq::error_t (which derives from std::exception) which includes a errno and the strerr message. This should indicate there was a interrupt. Then my server should handle the signal gracefully and save some data and shut down.

Below I have a fragment of code that demonstrates the problem. First I show a bit of my environment and how I compile it:

    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -v
    Using built-in specs.
    COLLECT_GCC=g++
    COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper
    Target: x86_64-linux-gnu
    Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro      4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
    Thread model: posix
    gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5) 
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ pkg-config --modversion `enter code here`libzmq
    2.2.0
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -pthread -std=c++0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq )
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$

and now the code of polling.cpp :

#include <zmq.hpp>
#include <thread>
#include <cstdlib>
#include <string>
#include <iostream>
#include <signal.h>

const char* bind_addres = "tcp://*:2345";
const char* connect_addres = "tcp://localhost:2345";

inline void
send_str( zmq::socket_t& sock, const std::string& s) throw (zmq::error_t) {
    zmq::message_t msg ( s.size() );
    memcpy( msg.data(), s.c_str(), msg.size() );
    sock.send( msg );
}

inline void recv_str( zmq::socket_t& sock, std::string& s) throw( zmq::error_t) {
    zmq::message_t msg;
    sock.recv(&msg);
    s = std::string( static_cast<const char*>(msg.data()), msg.size());
}
static int interrupted = 0;

static void handle_signal ( int signal ) 
{
    interrupted = signal;
    std::cerr << "Interrupted by signal: " << signal << std::endl;
}

void catch_signals (void) 
{
    struct sigaction action;
    action.sa_handler = handle_signal;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);

    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

int get_interrupted(void)
{
    std::cout << "interrupted = " << interrupted << std::endl;
    return interrupted;
}

void req_thread ( zmq::context_t* c ){


    zmq::socket_t sock( *c, ZMQ_REQ);
    sock.connect(connect_addres);

    zmq::pollitem_t items[]{
        { sock, 0, ZMQ_POLLIN, 0}
    };

    while (true){
        try {
            // zmq 3.x.x takes ms instead of us so change to eg 1000 or be patient.
            int rc = zmq::poll(items, 1, 1000000);
            if (rc > 0){
                if ( items[0].revents & ZMQ_POLLIN){
                    std::string s;
                    recv_str(sock, s);
                    std::cout << s << std::endl;
                }
            }
            else if ( rc == 0){ //timeout
                send_str( sock, "Hello");
            }
            else{
                std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
            }
        }
        catch( zmq::error_t& e ){
            std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
        }
    }
}

void rep_thread ( zmq::context_t* c ){

    zmq::socket_t sock( *c, ZMQ_REP);
    sock.bind(bind_addres);

    zmq::pollitem_t items[]{
        { sock, 0, ZMQ_POLLIN, 0}
    };

    while (true){
        try{
            int rc = zmq::poll(items, 1 , -1);
            if (rc > 0){
                if ( items[0].revents & ZMQ_POLLIN){
                    std::string s;
                    recv_str(sock, s);
                    s+=" world!";
                    send_str(sock, s);
                }
            }
            else{
                std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl;
            }
        }
        catch( zmq::error_t& e ){
            std::cout << __func__ << " " << __LINE__ << e.what() << std::endl;
        }
    }
}

int main(){

    zmq::context_t context(1);
    catch_signals();

    std::thread t1 ( rep_thread, &context);
    std::thread t2 ( req_thread, &context);

    t1.join();
    t2.join();

    return 0;
}

and finally I show some example output that demonstrates my issue that the zmq_poll does not seem to be affected by pressing ctrl+c in the terminal:

    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./poll 
    Hello world!
    Hello world!
    Hello world!
    ^CInterrupted by signal: 2
    Hello world!
    Hello world!
    ^Z
    [1]+  Stopped                 ./poll
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ kill -9 %1

    [1]+  Stopped                 ./poll
    mja@gijs:~/Desktop/sample_programs/zeromq/poll$ 

So here one could see by the output on the terminal that no zmq::error_t is thrown neither zmq::poll() returns -1;

how it should work is in the next example simple_poll.cpp one can see that a zmq::error_t is thrown:

#include<zmq.hpp>
#include<signal.h>
#include<iostream>


static int interrupted = 0;

static void handle_signal ( int signal ) 
{
    interrupted = signal;
    /*just to show that the signal handler works*/
    std::cerr << "Interrupted by signal: " << signal << std::endl;
}

void catch_signals (void) 
{
    struct sigaction action;
    action.sa_handler = handle_signal;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);

    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
}

using namespace std;

int main(){

    zmq::context_t context(1);
    catch_signals();
    zmq::socket_t sock( context, ZMQ_REP );
    /*listen on port 2346 on all available interfaces.*/
    sock.bind("tcp://*:2346");

    zmq::pollitem_t items[] = {
        {sock, 0 , ZMQ_POLLIN, 0}
    };

    try {
        /*wait for a event*/
        zmq::poll( items, 1, -1);
        /*for zmq users read message and respond*/
    }
    catch (zmq::error_t& e){
        cout << "error occured: " <<e.what() << endl;
        cout << "We were interrupted by: " << interrupted << endl;
    }
    return 0;
}

This yields the following results on ctrl+x in the terminal showing that the zmq::error_t is caught and the signal has been handled.

mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./simple_poll ^CInterrupted by signal: 2
error occured: Interrupted system call
We were interrupted by: 2
mja@gijs:~/Desktop/sample_programs/zeromq/poll$

解决方案

You've got a signal handler - but you don't do anything in it. In your handler, interrupt the polling loop (rather than true, check for some condition that you set in your signal handler.)

Let's say for argument's sake you are using c++11, try something like..

// Global which indicates that we are running..
std::atomic<bool> running = true;


// In your handler - reset this flag
static void handle_signal ( int signal ) {
  running = false;
}

Now your loops become:

while (running)
{
:
}

这篇关于在多线程c ++ 0x11程序中使用zmq :: poll结合cntr + x或kill信号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆