RabbitMQ,带有NodeJS的EasyNetQ(amqplib)-订阅 [英] RabbitMQ, EasyNetQ With NodeJS (amqplib) - Subscribing

查看:305
本文介绍了RabbitMQ,带有NodeJS的EasyNetQ(amqplib)-订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经问过有关从amqplib-> EasyNetQ发布的问题,并在EasyNetQ的作者的帮助下进行了工作.

I'd already asked this question regarding publishing from amqplib --> EasyNetQ, and got it working with help from the author of EasyNetQ.

现在,我在尝试其他方法时遇到了麻烦.

Now, I'm having trouble going the other way.

它确实做了短暂的工作",但是后来我回去清理了我创建的所有队列,现在-它不起作用(从amqplib到ENQ的发布仍然有效,但是从ENq到amqplib的发布却不起作用).

It did briefly "work", but then i went back and cleaned-out all the queues I'd created and now - it won't work (publishing from amqplib to ENQ still works, but ENQ to amqplib doesn't).

如果我有此代码:

Bus.SubscribeAsync<BusManifestHolla>(HollaSubID_1,
    msg => Task.Factory.StartNew(() => {
        Console.WriteLine("LOOK===> Received Manifest Holla ID {0}", msg.ManifestID.ToString());
        Console.WriteLine("LOOK===> Responding with Manifest Yo ID {0}", HollaSubID_1);
        Bus.PublishAsync(new BusManifestYo { ManifestID = msg.ManifestID, ServiceName = HollaSubID_1 });
    })
);

要订阅/使用它,我需要在下面的Node/amqplib中插入什么?

What do I need to plug-in in the Node/amqplib below to subscribe/consume it?

Play.AMPQ.then((connection) => {
    return connection.createChannel().then((channel) => {
        return channel.assertExchange(dto.BusManifestYo.Type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
            return channel.assertQueue(dto.BusManifestYo.Type).then((ok) => {
                return channel.consume(ok.queue, (msg) => {
                    console.log(util.format('Received message: %s', msg.content.toString()));
                    var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());

                    channel.ack(msg);
                });
            });
        });
    });
});

更新

如果我有EasyNetQ,则首先创建Queue(它将发布到该队列),然后在Node中删除"assertQueue"调用(这样它就不会使队列变得虚假),然后遵循命名约定-它可以工作.当然,这不是一个真正的解决方案,但是也许可以帮助某人指出解决方案?

If I have EasyNetQ first create the Queue (that it will publish to), and then remove the 'assertQueue' call in Node (so it doesn't bork the queue), and then follow the naming conventions - it works. Of course, this is not a real solution, but maybe it'll help someone point to a solution?

更新#2

好吧,显然我需要将队列绑定到交换机.这是新的工作代码:

Well, apparently I needed to bind the queue to the exchange. Here's the new working code:

Play.AMPQ.then((connection) => {
    return connection.createChannel().then((channel) => {
        channel.on('error', Play.handleChannelError);

        return channel.assertQueue(dto.BusManifestYo.Type + '_Node', { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return channel.assertExchange(dto.BusManifestYo.Type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return channel.bindQueue(dto.BusManifestYo.Type + '_Node', dto.BusManifestYo.Type, '#').then((okBindReply) => {
                    return channel.consume(dto.BusManifestYo.Type + '_Node', (msg) => {
                        console.log(util.format('Received message: %s', msg.content.toString()));
                        var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());

                        channel.ack(msg);
                    });
                });
            });
        });
    });
});

我不清楚的一件事是我在绑定上将模式设置为#".它正在工作,但是我只说了这一点,因为我看到ENQ使用了它,而其他值似乎不起作用...

One thing that's not clear to me is where I set the pattern on the bind to '#'. It's working, but I only put that because I saw ENQ use that, and other values don't seem to work so...

推荐答案

EasyNetQ具有自己的交换绑定队列约定.订阅后,它会按照您发现的方式进行操作:

EasyNetQ has it's own exchange-binding-queue conventions. When it subscribes, it does as you've discovered:

  1. 创建以消息类型命名的主题交换.
  2. 创建一个以消息类型加订阅ID命名的队列.
  3. 使用#"绑定键绑定它们.

它使用主题交换(而不是直接交换)的原因是为了支持主题路由,这就是为什么我们使用#"(给我所有东西)绑定来进行绑定的原因.如果您不发布主题,则该消息将使用空白路由键发布,并且只会被路由到#"绑定.

The reason it uses a topic exchange (rather than direct) is to support topic routing, that's why we bind with '#' (give me everything) binding. If you don't publish with a topic, then the message is published with a blank routing key, and that will only get routed to '#' bindings.

我希望这会有所帮助.

这篇关于RabbitMQ,带有NodeJS的EasyNetQ(amqplib)-订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆