ZMQ经销商-路由器通讯 [英] ZMQ DEALER - ROUTER Communication

查看:95
本文介绍了ZMQ经销商-路由器通讯的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在从事一个项目,该项目需要通过网络与分布式系统的某些实体使用不同数据类型的通信,并且我正在使用ZMQ。



该项目的主要目标是拥有一个中央节点,为可以随时连接的客户端提供服务。对于连接的每个客户端,中央节点应管理两者之间的消息通信。



当前,目前,所有通信都通过TCP进行。



客户端需要发送和随时接收消息,因此它们是 ZMQ_DEALER 类型的套接字,中央节点是 ZMQ_ROUTER



最初,目标是某个客户端发出一条消息,而该消息到达其他客户端。这意味着其他客户端可以全部看到相同的数据。



我已经使用



这是因为身份框架的开头是ROUTER接收时间,消息通过路由器发送回;在其中删除身份并使用该值将消息路由回相关的经销商,



此处 PUSH - PULL 用于将更新发送到服务器。如果不需要来自服务器的回复消息,则这很有意义。如果不需要该示例中的状态请求,则可以省去 ROUTER - DEALER 部分。为了简洁起见,这里是使用Python的示例实现。服务器监听 PULL 套接字,并通过 PUB 套接字发送所有内容:

  import zmq 

def main():
#上下文和套接字
ctx = zmq.Context()
发布者= ctx.socket(zmq.PUB)
Publisher.bind( tcp:// *:5557)
收藏者= ctx.socket(zmq.PULL)
collector.bind( tcp:// *:5558)

而True:
message = collector.recv()
print I:发布更新%s%消息
Publisher.send(消息)

如果__name__ =='__main__':
main()

客户端收听 PUB 套接字一段时间。如果收到消息,则将其记录下来。如果达到超时,则会以十分之一的几率生成一条消息:

 导入随机
导入时间

导入zmq

def main():

#准备上下文和订户
ctx = zmq.Context()
订户= ctx.socket(zmq.SUB)
订户.setsockopt(zmq.SUBSCRIBE,'')
订户.connect( tcp:// localhost:5557)
发行人= ctx.socket(zmq.PUSH)
Publisher.connect( tcp:// localhost:5558)

random.seed(time.time())
而True :$$$ b,如果subscriber.poll(100)& zmq.POLLIN:
消息= Subscriber.recv()
打印 I:收到的消息%s%消息
否则:
rand = random.randint(1,100)如果rand<
10:
Publisher.send(%d%rand)
打印 I:发送消息%d%rand

如果__name__ =='__main__':
main()


I am currently working on a project that requires some communication over the network of a different data types from some entities of a distributed system and I am using ZMQ.

The main goal of the project is to have a central node which services clients which can connect at any time. For each client connected, the central node should manage the message communication between the two.

Currently, and by the moment, all communication is happening over TCP.

The clients need to send and receive messages at any time so they are ZMQ_DEALER type sockets and the central node is ZMQ_ROUTER

Initially, the goal is that one message from some client, this message arrive at other clients. This means that the other clients can see the same data all.

I have using the Asynchronous Client/Server pattern because I am interested in having several clients talking to each other in a collaborative way, having maybe a server broker or middleware.

I have a ZMQ_DEALER socket client which connect to ZMQ_ROUTER socket server

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main(int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t client(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    client.connect(endpoint);
    for (int request = 0; request < 10; request++)
    {

        s_sendmore(client, "");
        s_send(client, "Testing sending some data");

        std::string string = s_recv(client);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }
}

On my server code, I have a ZMQ_ROUTER which receive and manage the messages is, making bind it to a well port. This server is made in Python

import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)

print("Creating Server Network Manager Router")

while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        print(message)
        frontend.send_multipart(message)

On my other peer/client I have the following:

#include <zmq.hpp>
#include "zhelpers.hpp"
using namespace std;

int main (int argc, char *argv[])
{

    zmq::context_t context(1);
    zmq::socket_t peer2(context, ZMQ_DEALER);

    const string endpoint = "tcp://localhost:5559";

    peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
    cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
    peer2.connect(endpoint);
    //s_sendmore(peer2, "");
    //s_send(peer2, "Probando");

    //std::string string = s_recv(peer2);

    //std::cout << "Received reply " << " [" << string << "]" << std::endl;

    for (int request = 0; request < 10; request++)
    {

        s_sendmore(peer2, "");
        s_send(peer2, "Probando");

        std::string string = s_recv(peer2);

        std::cout << "Received reply " << request
                  << " [" << string << "]" << std::endl;
    }

}

UPDATE

But each that I execute some client, their respective messages do not arrive at another peer client. The messages arrive at ZMQ_ROUTER, and are returned to the ZMQ_DEALER sender origin.

This is because the identity frame was preceded by the ROUTER at the time of reception and the message is sent back through the ROUTER; where it removes the identity and uses the value to route the message back to the relevant DEALER, according to the ZMQ_ROUTER section to the end page here.

And this is logic, I am sending the identity of my DEALER to the ROUTER, the ROUTER take that identity frame and return to my DEALER the message

In the first instance, to starting in my implementation, I need that some message sent by any DEALER, this will be visualized by any another DEALER without matter how many DEALERS (one or many) are connected to the ZMQ_ROUTER. In this sense ... Is necessary meet about of the identity frame of other DEALER or other DEALERS?

If I have DEALER A, DEALER B, and DEALER C, and ROUTER

then:

DEALER A send a message ... And I want that message from DEALER A to arrive at DEALER B and DEALER C and so other DEALERS that can be joined to my session conversation ...

In this ideas order, is necessary met the identity frame of DEALER B and DEALER C previously on the DEALER A side so that this message to arrive him?

How to know the identity frames of each DEALER existent on my implementation? This is made on the ROUTER side? I haven't clear this

解决方案

You could have all clients send a "I am here" message at start-up. The central server could then store all the IDs, c.f. the initial communication between worker and router in here: http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker. The server would send out any received message to all currently known clients. You should add some heart beating in order to detect disconnected clients, c.f. http://zguide.zeromq.org/page:all#Heartbeating.

However, ZeroMQ already comes with such a communication pattern: PUBSUB. In essence every client would have a DEALER and a SUB socket connected to the servers ROUTER and PUB sockets. The server simply sends out any received message via the PUB socket to all clients. If this would be a problem for the originating client, you can include the client ID in the message so that each client can filter out messages with their own ID. See also this example from the guide http://zguide.zeromq.org/page:all#Getting-an-Out-of-Band-Snapshot

Another interesting pattern would be Republishing Updates from Clients:

Here PUSH--PULL is used to send the updates to the server. This makes sense if there is no need for a reply message from the server. If you do not need the state request from that example, you can leave out the ROUTER--DEALER part. Here a sample implementation using Python for brevity. The server listens to the PULL socket and sends out everything via the PUB socket:

import zmq

def main():
    # context and sockets
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    while True:
        message = collector.recv()
        print "I: publishing update %s" % message
        publisher.send(message)

if __name__ == '__main__':
    main()

The client listens to the PUB socket for some time. If a message is received it is logged. If the timeout is reached, a message is generated with a 1 in 10 chance:

import random
import time

import zmq

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    while True:
        if subscriber.poll(100) & zmq.POLLIN:
            message = subscriber.recv()
            print "I: received message %s" % message
        else:
            rand = random.randint(1, 100)
            if rand < 10:
                publisher.send("%d" % rand)
                print "I: sending message %d" % rand

if __name__ == '__main__':
    main()

这篇关于ZMQ经销商-路由器通讯的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆