如何在c ++中的ZeroMQ中以扩展的PUB-SUB模式与Intermediary同步发布者和订阅者? [英] How to synchronize the publishers and subscribers in extended PUB-SUB pattern with Intermediary in ZeroMQ in c++?

查看:154
本文介绍了如何在c ++中的ZeroMQ中以扩展的PUB-SUB模式与Intermediary同步发布者和订阅者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

扩展的PUB/SUB拓扑

在一个具有1个中介的用例中,我有多个发布者和多个订阅者.

在ZeroMQ指南中,我了解了如何使用其他 REQ/REP 套接字同步1个发布者和1个订阅者.我尝试为用例编写一个同步代码,但是如果我尝试根据1-1 PUB/SUB给出的逻辑编写代码,则会变得很混乱.

只有1个发布者时的发布者代码为:

//Socket to receive sync request
zmq::socket_t syncservice (context, ZMQ_REP);
syncservice.bind("tcp://*:5562");

//  Get synchronization from subscribers
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {

    //  - wait for synchronization request
    s_recv (syncservice);

    //  - send synchronization reply
    s_send (syncservice, "");

    subscribers++;
}

只有1个订阅者时的订阅者代码为:

zmq::socket_t syncclient (context, ZMQ_REQ);
syncclient.connect("tcp://localhost:5562");

//  - send a synchronization request
s_send (syncclient, "");

//  - wait for synchronization reply
s_recv (syncclient);

现在,当我有多个订阅者时,那么每个订阅者都需要向每个发布者发送请求吗?

在我的用例中,发布者来去去去.他们的人数不固定.

因此,订阅者将不知道要连接多少个节点以及存在或不存在哪些发布者.

请提出同步扩展的 PUB/SUB 代码

的逻辑

解决方案

鉴于存在 XPUB/XSUB 中介节点,

PUB 节点的实际发现对于 XSUB -中介方而言可能是完全省力的(实际上基本上是这样避免的).

只需使用反向 XSUB.bind() -s/PUB.connect() -s,该问题就完全不存在了.

聪明,不是吗?

PUB 节点可能会来来去去,但是节点的 XSUB 侧策略调解器节点无需打扰(除了一些初始的.setsockopt( { LINGER, IMMEDIATE, CONFLATE, RCVHWM, MAXSIZE } )性能调整和增强鲁棒性的设置),可以享受实际主题过滤器的仍然有效和有效的组成 .setsockopt( zmq.SUBSCRIBE, ** ) 在服务中进行设置,并且可能会集中保持这种组成,而对于当前/以后的.connect() -ed live/功能失调的半时态组的状态/动态,基本上保持不可知状态端Agent节点.

更好,不是吗?

Extended PUB/SUB topology

I have multiple publishers and multiple subscribers in a use case with 1 intermediary.

In the ZeroMQ guide, I learnt about synchronizing 1 publisher and 1 subscriber, using additional REQ/REP sockets. I tried to write a synchronization code for my use case, but it is getting messy if I try to write code according to logic given for 1-1 PUB/SUB.

The publisher code when we have only 1 publisher is :

//Socket to receive sync request
zmq::socket_t syncservice (context, ZMQ_REP);
syncservice.bind("tcp://*:5562");

//  Get synchronization from subscribers
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {

    //  - wait for synchronization request
    s_recv (syncservice);

    //  - send synchronization reply
    s_send (syncservice, "");

    subscribers++;
}

The subscriber code when we have only 1 subscriber is:

zmq::socket_t syncclient (context, ZMQ_REQ);
syncclient.connect("tcp://localhost:5562");

//  - send a synchronization request
s_send (syncclient, "");

//  - wait for synchronization reply
s_recv (syncclient);

Now, when I have multiple subscribers, then does each subscriber need to send a request to every publisher?

The publishers in my use case come and go. Their number is not fixed.

So, a subscriber won't have any knowledge about how many nodes to connect to and which publishers are present or not.

Please suggest a logic to synchronize an extended PUB/SUB code

解决方案

Given the XPUB/XSUB mediator node is present,

the actual PUB-node discovery may be completely effort-less for the XSUB-mediator-side ( actually principally avoided as such ).

Just use the reversed the XSUB.bind()-s / PUB.connect()-s and the problem ceased to exist at all.

Smart, isn't it?

PUB-nodes may come and go, yet the XSUB-side of the Policy-mediator node need not bother ( except for a few initial .setsockopt( { LINGER, IMMEDIATE, CONFLATE, RCVHWM, MAXSIZE } ) performance tuning and robustness increasing settings ), enjoying the still valid and working composition of the actual Topic-filter(s) .setsockopt( zmq.SUBSCRIBE, ** ) settings in-service and may centrally maintain such composition remaining principally agnostic about the state/dynamic of the semi-temporal group of the now / later .connect()-ed live / dysfunctional PUB-side Agent-nodes.

Even better, isn't it?

这篇关于如何在c ++中的ZeroMQ中以扩展的PUB-SUB模式与Intermediary同步发布者和订阅者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆