等待 fanout 交换上的所有 rabbitmq 响应? [英] Waiting for all rabbitmq responses on a fanout exchange?
问题描述
我为 RPC 工作负载配置了一个名为ex_foo
"的rabbitmq 扇出交换.当客户端连接到服务器时,它们会创建自己的非持久 RPC 接收队列并使用 BasicConsumer 连接到它.应用程序侦听消息/命令并响应请求的 reply_to
部分中定义的队列.
I've configured a rabbitmq fanout exchange called "ex_foo
" for a RPC workload. When clients connect to the server, they create their own non-durable RPC receive queue and connect to it with a BasicConsumer. The apps listen for messages/commands and respond to the queue defined in the reply_to
part of the request.
我发送扇出交换(因此,连接到它的每个应用程序/客户端)的一个简单消息/命令是一种 ping 请求消息,我的问题是我不知道我会得到多少 ping 响应(或应该预期),因为我不知道任何时候有多少客户端连接到 fanout 交换.所有连接到扇出交换的客户端都应该回复.
One of the simple messages/commands I'm sending out the the fanout exchange (and thus, every application/client connected to it) is a type of ping request message, and my problem is that I don't know how many ping responses I will get (or should expect), because I don't know how many clients are connected to the fanout exchange at any one time. All clients connected to the fanout exchange should reply.
如果在扇出交换器上被传递到 10 个队列(即:连接了 10 个客户端),我如何知道预期有多少响应?为了知道这一点,我是否必须知道它交付了多少次?有没有更复杂的睡眠定时器?简单地说,我的管理工具不能无限期地等待,需要在收到所有 ping 后退出(或超时).
If gets delivered to 10 queues on the fanout exchange (ie: 10 clients are connected), how do I know how many responses to expect? In order to know that, would I have to know how many times it was delivered? Is there anything more sophisticated and a sleep timer? Simply, my admin tool can't just wait indefinitely and needs to quit after it has recveived all pings (or a time-out has elapsed).
推荐答案
您正在寻找的是类似 Scatter-Gather (http://www.eaipatterns.com/BroadcastAggregate.html) 模式,不是吗?
What you are looking for is something like a Scatter-Gather (http://www.eaipatterns.com/BroadcastAggregate.html) pattern, isn’t it?
您不知道绑定到扇出的消费者,因此您可以:
You don’t know the consumers bound to the fan-out, so you can:
- 使用例如绑定生产者的队列来实现消费者的保持活动状态.每个消费者每秒发送一次keep-alive,如果您没有收到消息,您可以考虑消费者离线.
- 使用注册消费者的内存数据库(始终保持活动状态).
- 使用 HTTP API 以这种方式了解绑定到扇出的消费者列表:
- implement an keep-alive from the consumer(s) using for example an queue where the producer is bound. Each consumer sends a keep-alive each one second, if you don’t receive a message you can considerer the consumer off-line.
- Use an in-memory database where the consumer are registered (always with a keep-alive).
- Use the HTTP API to know the consumers list bound to the fan-out, in this way:
http://rabbitmqip/vhost/yourfanout/bindings/source
结果是这样的:
[{"source":"yourfanout","vhost":"/","destination":"amq.gen-xOpYc8m10Qy1s4KCNFCgFw","destination_type":"queue","routing_key":"","arguments":{},"properties_key":"~"},{"source":" yourfanout","vhost":"/","destination":"myqueue","destination_type":"queue","routing_key":"","arguments":{},"properties_key":"~"}]
一旦计算了消费者,您就知道回复很重要.
Once count the consumers you know the replies count.
在发送请求之前调用 API.
Call the API before send a request.
注意最后一个只有在您使用绑定到消费者的临时队列时才能工作.
NOTE the last-one can works only if you use a temporary queue bound to the consumers.
我发现这个资源可以帮助你(http://geekswithblogs.net/michaelstephenson/archive/2012/08/06/150373.aspx)
I found this resource that could help you (http://geekswithblogs.net/michaelstephenson/archive/2012/08/06/150373.aspx)
我不确切知道您的最终范围,但是通过保持活动,您最多可以等待一秒钟,然后再决定消费者是否还活着.
I don't know exactly your final scope, but with a keep-alive you can wait max one second before decide if the consumer is alive.
这篇关于等待 fanout 交换上的所有 rabbitmq 响应?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!