在多线程c ++ 0x11程序中使用zmq :: poll结合cntr + x或kill信号 [英] using zmq::poll in multithreaded c++0x11 program in combination with cntr +x or kill signal
问题描述
对于自定义服务器,我打算使用 int zmq :: poll(zmq_pollitemt_t * items
, int nitems,long timeout = -1)
。函数,我认为是unix轮询函数的包装,但包括 zmq :: socket_t
旁边的文件描述符。该函数按我的预期工作,直到我按ctrl + x或在终端中运行 $ kill my_server_pid
。我希望调查以-1或者抛出一个 zmq :: error_t
(它源于 std :: exception
),其中包括 errno
和 strerr
消息。这应该表明有一个中断。然后我的服务器应该正常处理信号并保存一些数据并关闭。
下面我有一段代码演示了问题。首先我展示一些我的环境和如何编译它:
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ g ++ -v
使用内置specs。
COLLECT_GCC = g ++
COLLECT_LTO_WRAPPER = / usr / lib / gcc / x86_64-linux-gnu / 4.6 / lto-wrapper
目标:x86_64-linux-gnu
配置为:。 ./src/configure -v --with-pkgversion ='Ubuntu / Linaro 4.6.3-1ubuntu5'--with-bugurl = file:///usr/share/doc/gcc-4.6/README.Bugs --enable -languages = c,c ++,fortran,objc,obj-c ++ --prefix = / usr --program-suffix = -4.6 --enable-shared --enable-linker-build-id --with-system-zlib - -libexecdir = / usr / lib --unhout-included-gettext --enable-threads = posix --with-gxx-include-dir = / usr / include / c ++ / 4.6 --libdir = / usr / lib --enable -nls --with-sysroot = / --enable-clocale = gnu --enable-libstdcxx-debug --enable-libstdcxx-time = yes --enable-gnu-unique-object --enable-plugin --enable- objc-gc --disable-werror --with-arch-32 = i686 --with-tune = generic --enable-checking = release --build = x86_64-linux-gnu -host = x86_64-linux-gnu - -target = x86_64-linux-gnu
线程模型:posix
gcc版本4.6.3(Ubuntu / Linaro 4.6.3-1ubuntu5)
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ pkg-config --modversion`输入代码here`libzmq
2.2.0
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ g ++ -pthread -std = c ++ 0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq)
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $
现在是polling.cpp的代码:
#include< zmq.hpp>
#include< thread>
#include< cstdlib>
#include< string>
#include< iostream>
#include< signal.h>
const char * bind_addres =tcp:// *:2345;
const char * connect_addres =tcp:// localhost:2345;
inline void
send_str(zmq :: socket_t& sock,const std :: string& s)throw(zmq :: error_t){
zmq :: message_t msg .size());
memcpy(msg.data(),s.c_str(),msg.size());
sock.send(msg);
}
inline void recv_str(zmq :: socket_t& sock,std :: string& s)throw(zmq :: error_t){
zmq :: message_t msg;
sock.recv(& msg);
s = std :: string(static_cast< const char *>(msg.data()),msg.size());
}
static int interrupted = 0;
static void handle_signal(int signal)
{
interrupted = signal;
std :: cerr<< 被信号中断:<信号<< std :: endl;
}
void catch_signals(void)
{
struct sigaction action;
action.sa_handler = handle_signal;
action.sa_flags = 0;
sigemptyset(& action.sa_mask);
sigaction(SIGINT,& action,NULL);
sigaction(SIGTERM,& action,NULL);
}
int get_interrupted(void)
{
std :: cout< interrupted =<中断< std :: endl;
return interrupted;
}
void req_thread(zmq :: context_t * c){
zmq :: socket_t sock(* c,ZMQ_REQ);
sock.connect(connect_addres);
zmq :: pollitem_t items [] {
{sock,0,ZMQ_POLLIN,0}
};
while(true){
try {
// zmq 3.x.x需要ms而不是us,所以改为1000或者耐心。
int rc = zmq :: poll(items,1,1000000);
if(rc> 0){
if(items [0] .revents& ZMQ_POLLIN){
std :: string s;
recv_str(sock,s);
std :: cout<< s < std :: endl;
}
}
else if(rc == 0){// timeout
send_str(sock,Hello);
}
else {
std :: cout< __func__<< < __LINE__<< get_interrupted()<< std :: endl;
}
}
catch(zmq :: error_t& e){
std :: cout< __func__<< < __LINE__<< e.what()< std :: endl;
}
}
}
void rep_thread(zmq :: context_t * c){
zmq :: socket_t sock ,ZMQ_REP);
sock.bind(bind_addres);
zmq :: pollitem_t items [] {
{sock,0,ZMQ_POLLIN,0}
};
while(true){
try {
int rc = zmq :: poll(items,1,-1);
if(rc> 0){
if(items [0] .revents& ZMQ_POLLIN){
std :: string s;
recv_str(sock,s);
s + =world!;
send_str(sock,s);
}
}
else {
std :: cout< __func__<< < __LINE__<< get_interrupted()<< std :: endl;
}
}
catch(zmq :: error_t& e){
std :: cout< __func__<< < __LINE__<< e.what()< std :: endl;
}
}
}
int main(){
zmq :: context_t context(1);
catch_signals();
std :: thread t1(rep_thread,& context);
std :: thread t2(req_thread,& context);
t1.join();
t2.join();
return 0;
}
,最后我将展示一些示例输出, $ c> zmq_poll 似乎没有受到在终端中按ctrl + c的影响:
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ ./poll
Hello world!
Hello world!
Hello world!
^ CInterrupted by signal:2
Hello world!
Hello world!
^ Z
[1] +停止./poll
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ kill -9%1
[ 1] +已停止./poll
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $
$ b b
在这里可以看到终端上的输出没有zmq :: error_t被抛出,zmq :: poll()返回-1;
它应该工作是在下一个例子simple_poll.cpp可以看到一个zmq :: error_t被抛出:
#include< zmq.hpp>
#include< signal.h>
#include< iostream>
static int interrupted = 0;
static void handle_signal(int signal)
{
interrupted = signal;
/ *只是为了表明信号处理程序工作* /
std :: cerr<< 被信号中断:<信号<< std :: endl;
}
void catch_signals(void)
{
struct sigaction action;
action.sa_handler = handle_signal;
action.sa_flags = 0;
sigemptyset(& action.sa_mask);
sigaction(SIGINT,& action,NULL);
sigaction(SIGTERM,& action,NULL);
}
using namespace std;
int main(){
zmq :: context_t context(1);
catch_signals();
zmq :: socket_t sock(context,ZMQ_REP);
/ *在所有可用接口上的端口2346上监听。* /
sock.bind(tcp:// *:2346);
zmq :: pollitem_t items [] = {
{sock,0,ZMQ_POLLIN,0}
};
try {
/ *等待事件* /
zmq :: poll(items,1,-1);
/ *为zmq用户读取消息并响应* /
}
catch(zmq :: error_t& e){
cout< error occured:<< e.what()< endl
cout<< 我们被打断:<中断< endl
}
return 0;
}
这会在终端上在ctrl + x上产生以下结果,显示zmq :: error_t被捕获并且信号已被处理。
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $ ./simple_poll ^ C中断的信号:2
错误发生:中断的系统调用
我们中断了:2
mja @ gijs:〜/ Desktop / sample_programs / zeromq / poll $
您有一个信号处理程序 - 任何东西。在您的处理程序中,中断轮询循环(而不是 true
,检查您在信号处理程序中设置的某些条件。)
让我们说为了参数的缘故,你使用的是c ++ 11,尝试像...。
表示我们正在运行..
std :: atomic< bool> running = true;
//在您的处理程序中 - 重置此标志
static void handle_signal(int signal){
running = false;
}
现在你的循环变成了:
while(running)
{
:
}
For a custom server I intent to use the
int zmq::poll( zmq_pollitemt_t * items
,int nitems, long timeout = -1)
. function which is I think is a wrapper around the unix poll function but includeszmq::socket_t
next to file descriptors. The function works as I expected until I press ctrl+x or run$kill my_server_pid
in the terminal. I would expect that the poll to terminate with -1 or throws azmq::error_t
(which derives fromstd::exception
) which includes aerrno
and thestrerr
message. This should indicate there was a interrupt. Then my server should handle the signal gracefully and save some data and shut down.Below I have a fragment of code that demonstrates the problem. First I show a bit of my environment and how I compile it:
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -v Using built-in specs. COLLECT_GCC=g++ COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.6/lto-wrapper Target: x86_64-linux-gnu Configured with: ../src/configure -v --with-pkgversion='Ubuntu/Linaro 4.6.3-1ubuntu5' --with-bugurl=file:///usr/share/doc/gcc-4.6/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.6 --enable-shared --enable-linker-build-id --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.6 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --enable-plugin --enable-objc-gc --disable-werror --with-arch-32=i686 --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu Thread model: posix gcc version 4.6.3 (Ubuntu/Linaro 4.6.3-1ubuntu5) mja@gijs:~/Desktop/sample_programs/zeromq/poll$ pkg-config --modversion `enter code here`libzmq 2.2.0 mja@gijs:~/Desktop/sample_programs/zeromq/poll$ g++ -pthread -std=c++0x -Wall -Wextra -pedantic -o poll polling.cpp $(pkg-config --cflags --libs libzmq ) mja@gijs:~/Desktop/sample_programs/zeromq/poll$
and now the code of polling.cpp :
#include <zmq.hpp> #include <thread> #include <cstdlib> #include <string> #include <iostream> #include <signal.h> const char* bind_addres = "tcp://*:2345"; const char* connect_addres = "tcp://localhost:2345"; inline void send_str( zmq::socket_t& sock, const std::string& s) throw (zmq::error_t) { zmq::message_t msg ( s.size() ); memcpy( msg.data(), s.c_str(), msg.size() ); sock.send( msg ); } inline void recv_str( zmq::socket_t& sock, std::string& s) throw( zmq::error_t) { zmq::message_t msg; sock.recv(&msg); s = std::string( static_cast<const char*>(msg.data()), msg.size()); } static int interrupted = 0; static void handle_signal ( int signal ) { interrupted = signal; std::cerr << "Interrupted by signal: " << signal << std::endl; } void catch_signals (void) { struct sigaction action; action.sa_handler = handle_signal; action.sa_flags = 0; sigemptyset(&action.sa_mask); sigaction(SIGINT, &action, NULL); sigaction(SIGTERM, &action, NULL); } int get_interrupted(void) { std::cout << "interrupted = " << interrupted << std::endl; return interrupted; } void req_thread ( zmq::context_t* c ){ zmq::socket_t sock( *c, ZMQ_REQ); sock.connect(connect_addres); zmq::pollitem_t items[]{ { sock, 0, ZMQ_POLLIN, 0} }; while (true){ try { // zmq 3.x.x takes ms instead of us so change to eg 1000 or be patient. int rc = zmq::poll(items, 1, 1000000); if (rc > 0){ if ( items[0].revents & ZMQ_POLLIN){ std::string s; recv_str(sock, s); std::cout << s << std::endl; } } else if ( rc == 0){ //timeout send_str( sock, "Hello"); } else{ std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl; } } catch( zmq::error_t& e ){ std::cout << __func__ << " " << __LINE__ << e.what() << std::endl; } } } void rep_thread ( zmq::context_t* c ){ zmq::socket_t sock( *c, ZMQ_REP); sock.bind(bind_addres); zmq::pollitem_t items[]{ { sock, 0, ZMQ_POLLIN, 0} }; while (true){ try{ int rc = zmq::poll(items, 1 , -1); if (rc > 0){ if ( items[0].revents & ZMQ_POLLIN){ std::string s; recv_str(sock, s); s+=" world!"; send_str(sock, s); } } else{ std::cout << __func__ << " " << __LINE__ << get_interrupted() << std::endl; } } catch( zmq::error_t& e ){ std::cout << __func__ << " " << __LINE__ << e.what() << std::endl; } } } int main(){ zmq::context_t context(1); catch_signals(); std::thread t1 ( rep_thread, &context); std::thread t2 ( req_thread, &context); t1.join(); t2.join(); return 0; }
and finally I show some example output that demonstrates my issue that the
zmq_poll
does not seem to be affected by pressing ctrl+c in the terminal:mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./poll Hello world! Hello world! Hello world! ^CInterrupted by signal: 2 Hello world! Hello world! ^Z [1]+ Stopped ./poll mja@gijs:~/Desktop/sample_programs/zeromq/poll$ kill -9 %1 [1]+ Stopped ./poll mja@gijs:~/Desktop/sample_programs/zeromq/poll$
So here one could see by the output on the terminal that no zmq::error_t is thrown neither zmq::poll() returns -1;
how it should work is in the next example simple_poll.cpp one can see that a zmq::error_t is thrown:
#include<zmq.hpp> #include<signal.h> #include<iostream> static int interrupted = 0; static void handle_signal ( int signal ) { interrupted = signal; /*just to show that the signal handler works*/ std::cerr << "Interrupted by signal: " << signal << std::endl; } void catch_signals (void) { struct sigaction action; action.sa_handler = handle_signal; action.sa_flags = 0; sigemptyset(&action.sa_mask); sigaction(SIGINT, &action, NULL); sigaction(SIGTERM, &action, NULL); } using namespace std; int main(){ zmq::context_t context(1); catch_signals(); zmq::socket_t sock( context, ZMQ_REP ); /*listen on port 2346 on all available interfaces.*/ sock.bind("tcp://*:2346"); zmq::pollitem_t items[] = { {sock, 0 , ZMQ_POLLIN, 0} }; try { /*wait for a event*/ zmq::poll( items, 1, -1); /*for zmq users read message and respond*/ } catch (zmq::error_t& e){ cout << "error occured: " <<e.what() << endl; cout << "We were interrupted by: " << interrupted << endl; } return 0; }
This yields the following results on ctrl+x in the terminal showing that the zmq::error_t is caught and the signal has been handled.
mja@gijs:~/Desktop/sample_programs/zeromq/poll$ ./simple_poll ^CInterrupted by signal: 2 error occured: Interrupted system call We were interrupted by: 2 mja@gijs:~/Desktop/sample_programs/zeromq/poll$
解决方案You've got a signal handler - but you don't do anything in it. In your handler, interrupt the polling loop (rather than
true
, check for some condition that you set in your signal handler.)Let's say for argument's sake you are using c++11, try something like..
// Global which indicates that we are running.. std::atomic<bool> running = true; // In your handler - reset this flag static void handle_signal ( int signal ) { running = false; }
Now your loops become:
while (running) { : }
这篇关于在多线程c ++ 0x11程序中使用zmq :: poll结合cntr + x或kill信号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!