ZeroMQ,我们可以使用 inproc:transport 和 pub/sub 消息传递模式吗 [英] ZeroMQ, can we use inproc: transport along with pub/sub messaging pattern

查看:79
本文介绍了ZeroMQ,我们可以使用 inproc:transport 和 pub/sub 消息传递模式吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景:

我们正在评估事件驱动机制的 ZeroMQ(特别是 jeroMq).

We were evaluating ZeroMQ (specifically jeroMq) for an event driven mechanism.

应用程序是分布式的,其中多个服务(发布者和订阅者都是服务)可以存在于同一个 jvm 中,也可以存在于不同的节点中,这取决于部署架构.

The application is distributed where multiple services (both publishers and subscribers are services) can exist either in the same jvm or in distinct nodes, which depends on the deployment architecture.

观察

为了玩耍,我使用 jero 创建了一个 pub/sub 模式,使用 inproc: 作为传输mq(版本:0.3.5)

For playing around I created a pub/sub pattern with inproc: as the transport , using jero mq (version :0.3.5)

  1. 线程发布是可以发布的(看起来要发布了,至少没有错误)
  2. 另一个线程中的订阅者没有收到任何消息.

问题

使用 inproc:pub/sub 是否可行?

Is using inproc: along with pub/sub feasible?

尝试使用谷歌搜索但找不到任何具体的信息,任何见解?

Tried googling but couldn't find anything specific, any insights?

pub/subinproc:

Code sample for pub/sub with inproc:

使用 jero mq(版本:0.3.5)的 inproc pub sub 的工作代码示例,对以后访问这篇文章的人很有用.一个发布者发布主题 AB,两个订阅者接收 AB 分开

The working code sample for inproc pub sub using jero mq (version :0.3.5), would be useful for someone later visiting this post. One publisher publishing topics A and B, and two subscribers receiving A and B separately

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });

}

/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {

    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {

            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}

/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");

    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

推荐答案

ZMQ inproc transport 旨在用于单个进程内、不同线程之间.当你说可以存在于同一个 jvm 或不同的节点"(强调我的)时,我认为你的意思是你将多个进程作为分布式服务而不是单个进程中的多个线程.

The ZMQ inproc transport is intended for use within a single process, between different threads. When you say "can exist either in the same jvm or in distinct nodes" (emphasis mine) I assume you mean that you're spinning up multiple processes as distributed services rather than multiple threads within a single process.

如果是这样,那么不,您尝试执行的操作不适用于 inproc.PUB-SUB/inproc 可以在多个线程之间的单个进程中正常工作.

If that's the case, then no, what you're trying to do won't work with inproc. PUB-SUB/inproc would work fine within a single process between multiple threads.

编辑以解决评论中的其他问题:

Edit to address further questions in the comments:

使用 inprocipc 之类的传输的原因是因为当您处于正确的上下文中时,它比 tcp 传输更有效(更快)使用它们.可以想象,您可以混合使用多种传输方式,但您始终必须在同一传输方式上进行绑定和连接才能使其工作.

The reason to use a transport like inproc or ipc is because it's a little more efficient (faster) than the tcp transport when you're in the right context to use them. You could conceivably use a mixture of transports, but you'll always have to bind and connect on the same transport to make it work.

这意味着每个节点最多需要三个 PUBSUB 套接字 - 一个 tcp 发布者与远程主机上的节点通信,ipc 发布者与同一主机上不同进程上的节点进行对话,而 inproc 发布者与同一进程中不同线程中的节点进行对话.

This means that each node would need up to three PUB or SUB sockets - a tcp publisher to talk to nodes on remote hosts, an ipc publisher to talk to nodes on different processes on the same host, and an inproc publisher to talk to nodes in different threads in the same process.

实际上,在大多数情况下,您只需使用 tcp 传输,并且只为所有内容启动一个套接字 - tcp 可以在任何地方使用.如果每个套接字负责一种特定的种类信息,那么启动多个套接字可能是有意义的.

Practically, in most cases you'd just use the tcp transport and only spin up one socket for everything - tcp works everywhere. It could make sense to spin up multiple sockets if each socket is responsible for a particular kind of information.

如果您总是向其他线程发送一种消息类型而向其他主机发送一种不同的消息类型是有原因的,那么多个套接字是有道理的,但在您的情况下,从一个节点的角度来看,这听起来像,所有其他节点都是平等的.在那种情况下,我会在任何地方使用 tcp 并完成它.

If there's a reason that you'll always be sending one message type to other threads and a different message type to other hosts, then multiple sockets makes sense, but in your case it sounds like, from the perspective of one node, all other nodes are equal. In that case I would use tcp everywhere and be done with it.

这篇关于ZeroMQ,我们可以使用 inproc:transport 和 pub/sub 消息传递模式吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆