在数据到达时,ZeroMQ是否具有通知/回调事件/消息? [英] Does ZeroMQ have a notification/callback event/message for when data arrives?

查看:479
本文介绍了在数据到达时,ZeroMQ是否具有通知/回调事件/消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将ZMQ集成到严重依赖MFC套接字(CASyncSocket)的现有Windows应用程序中.

I am trying to integrate ZMQ into an existing windows application that relies heavily on MFC sockets (CASyncSocket).

我有一个CWinThread派生的UI线程(没有GUI),它使用CAsyncSocket与服务器异步通信.我想添加一条ZMQ inproc通信线来处理将从服务器接收的数据(以REQ/REP为基础)与应用程序中的其他线程进行通信.

I've got a CWinThread derived UI thread (without a GUI) that communicates with a server asynchronously using CAsyncSocket. I would like to add a ZMQ inproc communication line to handle communicating the data received from the server (on a REQ/REP basis) to other threads within the application.

使用CAsyncSocket,只要要接收套接字上的新数据,MFC框架就会调用OnReceive方法(这可能是对那里的MFC核心专家的过度简化).

Using CAsyncSocket, the OnReceive method is called by the MFC framework whenever new data is available on the socket to be received (that might be an over-simplification to the hardcore MFC gurus out there).

ZMQ中是否有这种机制?还是我必须添加UI线程启动的其他专用WorkerThread,以处理与应用程序其余部分的ZMQ通信?这两个管道上的流量都很小,因此如果我能通过1获得成功,我真的不想创建2个单独的线程.

Is there any such mechanism in ZMQ? Or do I have to add an additional dedicated WorkerThread that the UI thread launches to handle my ZMQ communications to the rest of the app? The traffic on both pipelines is minimal so I really don't want to have to create 2 separate threads if I can get by with 1.

注意,我已经掌握了基础知识,但是同步方面遇到了问题.如果我对ZMQ使用了阻止recv/send,它会饿死我的CAsycSocket,因为Windows消息永远不会被线程处理,导致有时永远不会从应该ZMQ传递的服务器上获取数据.但是,如果我使用非阻塞的ZMQ调用,则该线程通常最终会处于空闲状态,因为它不知道要读取ZMQ套接字.

Note, I've got the basics working, I'm just having problems with synchronization. If I use blocking recv/send with ZMQ, it starves out my CAsycSocket because the windows messages never get processed by the thread resulting in sometimes never getting the data from the server that the ZMQ is supposed to be delivering. But if I use non-blocking ZMQ calls, then the thread frequently ends up sitting idle because it doesn't know to read off the ZMQ socket.

推荐答案

最终,答案为.当前没有回调可用于通知数据何时到达ZeroMQ.我也找不到任何添加此功能的叉子.

Ultimately, the answer is no. There are no current callback/notifications for when data arrives in ZeroMQ that you can link into. I was also unable to find any fork that adds this functionality.

在单个线程中使用MFC套接字框架提供的传统OnReceive调用时,我无法使ZMQ工作,并且添加专用于ZMQ的第二线程打败了使用ZMQ的全部目的(它用于线程同步) ).

I was unable to get ZMQ working while using the traditional OnReceive calls provide by the MFC socket framework within a single thread and adding a 2nd thread to dedicate to ZMQ defeated the whole purpose for using it (it is being used for thread synchronization).

我有效的实现最终放弃了MFC套接字,并为我的Inproc服务器(用于与其他线程通信)以及TCP(非ZMQ)服务器连接使用ZMQ,并使用了阻塞轮询(zmq_poll())在OnIdle()方法中(每次返回1都会创建一个忙循环).阻止投票

My implementation that works ended up dropping MFC sockets and using ZMQ for both my inproc server (for communicating with other thread) as well as my TCP (non-ZMQ) server connection and using a blocking polling call (zmq_poll()) in the OnIdle() method (returning 1 every time to create a busy loop). The blocking poll

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);

    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %d\n", iResult);

    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %d\n", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %d\n", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %d\n", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

注意:此答案要求使用ZMQ 4或更高版本,因为ZMQ的早期版本将无法与常规TCP套接字连接进行通信.

Note: This answer requires using ZMQ 4 or later since earlier versions of ZMQ will not communicate with a regular TCP socket connection.

这篇关于在数据到达时,ZeroMQ是否具有通知/回调事件/消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆