每个线程或每个调用一个ZeroMQ套接字? [英] One ZeroMQ socket per thread or per call?

查看:84
本文介绍了每个线程或每个调用一个ZeroMQ套接字?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

众所周知,一个ZeroMQ套接字 不得共享.
context_t实例可以.

As we all know, a ZeroMQ socket shall not be shared among application threads.
context_t instances however can.

我有一个多线程应用程序,我希望每个线程不时与 REQ/REP 套接字交易对手(事件,异常等)交换消息,取决于他们在做什么(他们在做非ZeroMQ的东西).

I have a multi-threaded-application and I'd like to have each thread exchange messages from time to time with a REQ/REP-socket counterparty ( event, exceptions and the like ), depending on what they are doing ( they are doing non-ZeroMQ-stuff ).

要将消息发送到我的 REQ/REP -socket,我使用以下函数
(半C ++半伪代码):

To send messages to my REQ/REP-socket I use the following function
( a half-C++ half-pseudo-code ):

sendMessage:

sendMessage:

bool sendMessage(std::string s)
{
    zmq::socket_t socket(globalContext(), ZMQ_REQ);
    socket.connect("ipc://http-concentrator");

    zmq::message_t message(s.size());
    memcpy(message.data(), s.data(), s.size());
    if (!socket.send(message))
        return false;

    // poll on socket for POLLIN with timeout

    socket.recv(&message);
    // do something with message

    return true;
}

在需要时从每个线程调用此函数.它创建一个本地套接字,连接,发送消息并接收响应.在出口处,套接字已断开并已卸下(至少我以为它是关闭的).

This function is called from every thread when needed. It creates a local socket, connects, sends the message, and receives a response. At exit, the socket is disconnected and removed ( at least I'm assuming that it is closed ).

这样,我不必费心在每个线程中维护一个套接字.每次我调用此函数时,都要以创建和连接为代价.

This way, I don't need to bother to maintain a socket in each of my threads. This comes at the cost of creating and connecting each time I call this function.

我已经强调了这段代码,并且我没有发现重用一个套接字和此reconnect-implement之间有多大区别. (在用例的两边,我每秒有20k个REP/REQ事务,包括JSON解码/编码)

I have stressed this code and I didn't see much difference between reusing one socket and this reconnect-implementation. ( I have 20k REP/REQ transactions per second, including a JSON-decode/encode, on both side of the use-case )

问::是否有更正确的ZeroMQ方法?

Q: Is there a more correct ZeroMQ-way of doing this?

推荐答案

这是我的(当前)解决方案,在C ++ 11中,您可以将对象分配给thread_local -storage.将socket_t-实例staticthread_local存储在一个函数中可以为我提供我一直在寻找的功能:

Here is my (current) solution, in C++11 you can assign object to a thread_local-storage. Storing the socket_t-instance static and thread_local in a function gives me the functionality I was looking for:

class socketPool
{
    std::string endpoint_;

public:
    socketPool(const std::string &ep) : endpoint_(ep) {}

    zmq::socket_t & operator()()
    {
        thread_local static zmq::socket_t socket(
                globalContext(), 
                ZMQ_REQ);
        thread_local static bool connected;

        if (!connected) {
            connected = true;
            socket.connect(endpoint_);
        }

        return socket;
    }
};

// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");

在我的sendMessage()功能中,我无需创建和连接

In my sendMessage()-function instead of creating and connecting I simply do

bool sendMessage(std::string s)
{
    zmq::socket_t &socket = httpReqPool();

    // the rest as above
}

就性能而言,它在我的计算机上快7倍. (每秒140k REQ/REP).

Regarding performance, well, it's 7 times faster on my machine. (140k REQ/REP per second).

这篇关于每个线程或每个调用一个ZeroMQ套接字?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆