当以 ZMQ REQ/REP 模式回复时间过长时,应用程序崩溃 [英] App crashes when it takes too long to reply in a ZMQ REQ/REP pattern

查看:97
本文介绍了当以 ZMQ REQ/REP 模式回复时间过长时,应用程序崩溃的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个插件,它通过 ZeroMQ REQ/REP 请求-回复通信原型与桌面应用程序交互.我目前可以接收请求,但如果回复发送速度不够快,应用程序似乎会崩溃.

I am writing a plugin that interfaces with a desktop application through a ZeroMQ REQ/REP request-reply communication archetype. I can currently receive a request, but the application seemingly crashes if a reply is not sent quick enough.

我在衍生的线程上收到请求并将其放入队列中.该队列在另一个线程中进行处理,在该线程中,应用程序会定期调用处理函数.

I receive the request on a spawned thread and put it in a queue. This queue is processed in another thread, in which the processing function is invoked by the application periodically.

正确接收和处理消息,但直到函数的下一次迭代才能发送响应,因为在那之前我无法从应用程序获取数据.

The message is correctly being received and processed, but the response cannot be sent until the next iteration of the function, as I cannot get the data from the application until then.

当这个函数被限制为在下一次迭代中发送响应时,应用程序将崩溃.但是,如果我在收到请求后立即发送假数据作为响应,在第一次迭代中,应用程序不会崩溃.

When this function is conditioned to send the response on the next iteration, the application will crash. However, if I send fake data as the response soon after receiving the request, in the first iteration, the application will not crash.

构造套接字

    zmq::socket_t socket(m_context, ZMQ_REP);
    socket.bind("tcp://*:" + std::to_string(port));

在生成的线程中接收消息

Receiving the message in the spawned thread

void ZMQReceiverV2::receiveRequests() {
    nInfo(*m_logger) << "Preparing to receive requests";
    while (m_isReceiving) {
        zmq::message_t zmq_msg;
        bool ok = m_respSocket.recv(&zmq_msg, ZMQ_NOBLOCK);
        if (ok) {
            // msg_str will be a binary string
            std::string msg_str;
            msg_str.assign(static_cast<char *>(zmq_msg.data()), zmq_msg.size());
            nInfo(*m_logger) << "Received the message: " << msg_str;
            std::pair<std::string, std::string> pair("", msg_str);
            // adding to message queue
            m_mutex.lock();
            m_messages.push(pair);
            m_mutex.unlock();
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    nInfo(*m_logger) << "Done receiving requests";
}

单独线程上的处理函数


void ZMQReceiverV2::exportFrameAvailable()
    // checking messages
    // if the queue is not empty
    m_mutex.lock();
    if (!m_messages.empty()) {
        nInfo(*m_logger) << "Reading message in queue";
        smart_target::SMARTTargetCreateRequest id_msg;
        std::pair<std::string, std::string> pair = m_messages.front();
        std::string topic   = pair.first;
        std::string msg_str = pair.second;
        processMsg(msg_str);
        // removing just read message
        m_messages.pop(); 
        //m_respSocket.send(zmq::message_t()); wont crash if I reply here in this invocation
    }
    m_mutex.unlock();

    // sending back the ID that has just been made, for it to be mapped
    if (timeToSendReply()) {
        sendReply();  // will crash, if I wait for this to be exectued on next invocation
    }
}

我的研究表明发送响应没有时间限制,所以这似乎是时间问题,很奇怪.

My research shows that there is no time limit for the response to be sent, so this, seeming to be, timing issue, is strange.

是否有什么我遗漏的东西可以让我在处理函数的第二次迭代时发送响应?

Is there something that I am missing that will let me send the response on the second iteration of the processing function?

修订版 1:

我已经编辑了我的代码,因此响应套接字只存在于一个线程上.由于需要从处理函数中获取信息进行发送,所以创建了另一个队列,在修改后的函数运行在自己的线程中进行检查.

I have edited my code, so that the responding socket only ever exists on one thread. Since I need to get information from the processing function to send, I created another queue, which is checked in the revised the function running on its own thread.

void ZMQReceiverV2::receiveRequests() {
    zmq::socket_t socket = setupBindSocket(ZMQ_REP, 5557, "responder");
    nInfo(*m_logger) << "Preparing to receive requests";
    while (m_isReceiving) {
        zmq::message_t zmq_msg;
        bool ok = socket.recv(&zmq_msg, ZMQ_NOBLOCK);
        if (ok) {
            // does not crash if I call send helper here
            // msg_str will be a binary string
            std::string msg_str;
            msg_str.assign(static_cast<char *>(zmq_msg.data()), zmq_msg.size());
            NLogger::nInfo(*m_logger) << "Received the message: " << msg_str;
            std::pair<std::string, std::string> pair("", msg_str);
            // adding to message queue
            m_mutex.lock();
            m_messages.push(pair);
            m_mutex.unlock();
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        if (!sendQueue.empty()) {
            sendEntityCreationMessage(socket, sendQueue.front());
            sendQueue.pop();
        }
    }
    nInfo(*m_logger) << "Done receiving requests";
    socket.close();
}

函数 sendEntityCreationMessage() 是一个辅助函数,最终调用 socket.send().

The function sendEntityCreationMessage() is a helper function that ultimately calls socket.send().

void ZMQReceiverV2::sendEntityCreationMessage(zmq::socket_t &socket, NUniqueID id) {
    socket.send(zmq::message_t());
}

这段代码似乎遵循了套接字的线程安全指南.有什么建议吗?

This code seems to be following the thread safety guidelines for sockets. Any suggestions?

推荐答案

Q:我有什么遗漏吗"

是的,
ZeroMQ 福音化,称为 Zen-of-Zero,因为从不尝试共享 Socket 实例,从不尝试阻塞,也从不期望世界按照自己的意愿行事.

Yes,
the ZeroMQ evangelisation, called a Zen-of-Zero, since ever promotes never try to share a Socket-instance, never try to block and never expect the world to act as one wishes.

这就是说,避免从任何非本地线程接触相同的 Socket 实例,除了已经实例化并拥有套接字的线程.

This said, avoid touching the same Socket-instance from any non-local thread, except the one that has instantiated and owns the socket.

最后但并非最不重要的是,REQ/REP-可扩展的正式通信模式原型很容易陷入僵局,因为必须遵守强制性的两步舞——必须保持.send()-.recv()-.send()-.recv()-.send()-...-methods,否则主要是 串联有限状态自动机 (FSA) 将无可挽回地最终陷入 dFSA 的相互自我死锁状态.

Last, but not least, the REQ/REP-Scalable Formal Communication Pattern Archetype is prone to fall into a deadlock, as a mandatory two-step dance must be obeyed - where one must keep the alternating sequence of calling .send()-.recv()-.send()-.recv()-.send()-...-methods, otherwise the principally distributed-system tandem of Finite State Automata (FSA) will unsalvageably end up in a mutual self-deadlock state of the dFSA.

如果有人计划在 ZeroMQ 上进行专业构建,最好的下一步是重新阅读精彩的 Pieter HINTJENS 的书代码连接:第 1 卷".一篇辛苦阅读,但绝对值得花时间,汗水,泪水和;付出的努力.

这篇关于当以 ZMQ REQ/REP 模式回复时间过长时,应用程序崩溃的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆