ZeroMQ (clrzmq4) 轮询问题 [英] ZeroMQ (clrzmq4) polling issue

查看:20
本文介绍了ZeroMQ (clrzmq4) 轮询问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要完成的是实现从两个套接字之一读取消息,无论它首先到达何处.据我所知,轮询 (zmq_poll) 是正确的做法(如 指南中的mspoller).在这里,我将提供小的伪代码片段:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);使用 (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))使用 (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR)){receiver1.Bind("tcp://someaddress");//注意 PAIR 套接字是 inproc:receiver2.Connect("inproc://otheraddress");var poll = ZPollItem.CreateReceiver();ZError 错误;ZMessage msg;而(真){if (receiver1.PollIn(poll, out msg, out error, timeout)){//...}if (receiver2.PollIn(poll, out msg, out error, timeout)){//...}}}

如您所见,它实际上与指南中的mspoller中的实现完全相同.

在我的情况下 receiver2(PAIR 套接字)应该接收大量消息.事实上,我创建了一个测试,其中发送给它的消息数量总是大于它能够接收的消息数量(至少在演示的实现中).

我已经运行了 2 秒钟的测试,结果让我感到非常惊讶:

  • 发送到 receiver2 的消息数:180(发送"我的意思是它们被分发到另一个未在前面的代码片段中显示的 PAIR 套接字);
  • receiver2 收到的消息数:21 ???2 秒内只有 21 条消息???每秒 10 条消息???

然后我尝试使用不同的 timeout 值,我发现它会显着影响收到的消息数量.持续时间(2 秒)和发送的消息数量(180)保持不变.结果是:

  • timeout 值为 200 毫秒 - 收到的消息数量下降到 10(每秒 5 条);
  • timeout 值为 10 毫秒 - 收到的消息数增加到 120(每秒 60 条).

结果告诉我,轮询根本不起作用.如果轮询正常工作,据我了解该机制,timeout 在这种情况下应该没有任何影响.无论我们将超时设置为 1 小时还是 5 毫秒 - 因为总是有消息要接收,所以没有什么可等待的,所以循环应该以相同的速度工作.

我的另一个大问题是,即使 timeout 值很小,receiver2 也无法接收所有 180 条消息.我在这里努力实现每秒 100 条消息的接收率,尽管我选择了应该非常快的 ZeroMQ(基准测试中提到的数字为每秒 600 万条消息).

所以我的问题很明显:我在这里做错了吗?有没有更好的方法来实现轮询?

通过浏览 clrzmq4 代码,我注意到还可以在枚举套接字时调用 pollIn 方法 ZPollItems.cs,第 151 行,但我在任何地方都没有找到任何示例!

这是正确的方法吗?任何地方的任何文件?

谢谢

解决方案

我已经找到了这个问题/解决方案.而不是在每个套接字上分别使用 PollIn 方法,我们应该在套接字数组上使用 PollIn 方法.显然指南中的示例误导性很大.这是正确的方法:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);使用 (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))使用 (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR)){receiver1.Bind("tcp://someaddress");receiver2.Connect("inproc://otheraddress");//我们应该记住"数组中套接字的顺序//因为接收到的数组中的消息顺序将与之对应.ZSocket[] 套接字 = {receiver1,receiver2};//请注意,我们应该使用两个 ZPollItem 实例:ZPollItem[] pollItems = { ZPollItem.CreateReceiver(), ZPollItem.CreateReceiver() };ZError 错误;ZMessage[] 味精;而(真){if (sockets.PollIn(pollItems, out msg, out error, timeout)){如果(味精 [0] != 空){//从receiver1 得到的第一条消息}如果(味精 [1] != 空){//从receiver2 得到的第二条消息}}}}

现在 receiver2 达到每秒 15,000 条接收消息,无论 timeout 值如何,也无论 receiver1 接收多少条消息.>

更新:clrzmq4 的人已经承认这个问题,所以这个例子可能是很快更正.

What I'm trying to accomplish is to implement reading a message from one of two sockets, wherever it arrives first. As far as I understand polling (zmq_poll) is the right thing to do (as demonstrated in mspoller in guide). Here I'll provide small pseudo-code snippet:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);

using (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))
using (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR))
{
    receiver1.Bind("tcp://someaddress");
    // Note that PAIR socket is inproc:
    receiver2.Connect("inproc://otheraddress");

    var poll = ZPollItem.CreateReceiver();

    ZError error;
    ZMessage msg;

    while (true)
    {
        if (receiver1.PollIn(poll, out msg, out error, timeout))
        {
            // ...
        }

        if (receiver2.PollIn(poll, out msg, out error, timeout))
        {
            // ...
        }
    }
}

As you can see it is actually the same exact implementation as in mspoller in guide.

In my case receiver2 (PAIR socket) should receive a large number of messages. In fact I've created a test in which number of messages sent to it is always greater than the number of messages it is capable to receive (at least in demonstrated implementation).

I've run the test for 2 seconds, and I was very surprised with results:

  • Number of messages sent to receiver2: 180 (by "sent" I mean that they are handed out to another PAIR socket not shown in the previous snippet);
  • Number of messages received by receiver2: 21 ??? Only 21 messages in 2 seconds??? 10 messages per second???

Then I've tried to play with different timeout values and I've found out that it significantly influences the number of messages received. Duration (2 seconds) and number of messages sent (180) remain the same. The results are:

  • timeout value of 200 milliseconds - number of messages received drops to 10 (5 per second);
  • timeout value of 10 milliseconds - number of messages received rises to 120 (60 per second).

The results are telling me that polling simply does not work. If polling were working properly, as far as I understand the mechanism, timeout should not have any influence in this scenario. No matter if we set timeout to 1 hour or 5 milliseconds - since there are always messages to receive there's nothing to wait for, so the loop should work with the same speed.

My another big concern is the fact that even with very small timeout value receiver2 is not capable to receive all 180 messages. I'm struggling here to accomplish receiving rate of 100 messages per second, although I've selected ZeroMQ which should be very fast (benchmarks are mentioning numbers as 6 million messages per second).

So my question is obvious: am I doing something wrong here? Is there a better way to implement polling?

By browsing clrzmq4 code I've noticed that there's also possibility to call pollIn method on enumeration of sockets ZPollItems.cs, line 151, but I haven't found any example anywhere!

Can this be the right approach? Any documentation anywhere?

Thanks

解决方案

I've found the problem / solution for this. Instead using PollIn method on each socket separately we should use PollIn method on array of sockets. Obviously the example from the guide is HUGELY MISLEADING. Here's the correct approach:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);

using (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))
using (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR))
{
    receiver1.Bind("tcp://someaddress");
    receiver2.Connect("inproc://otheraddress");

    // We should "remember" the order of sockets within the array
    // because order of messages in the received array will correspond to it.
    ZSocket[] sockets = { receiver1, receiver2 };

    // Note that we should use two ZPollItem instances:
    ZPollItem[] pollItems = { ZPollItem.CreateReceiver(), ZPollItem.CreateReceiver() };

    ZError error;
    ZMessage[] msg;

    while (true)
    {
        if (sockets.PollIn(pollItems, out msg, out error, timeout))
        {
            if (msg[0] != null)
            {
                // The first message gotten from receiver1
            }

            if (msg[1] != null)
            {
                // The second message gotten from receiver2
            }
        }
    }
}

Now receiver2 reaches 15,000 received messages per second, no matter timeout value, and no matter number of messages received by receiver1.

UPDATE: Folks from clrzmq4 have acknowledged this issue, so probably the example will be corrected soon.

这篇关于ZeroMQ (clrzmq4) 轮询问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆