如何在 zmq 的推/拉模式中设置 hwm? [英] How could I set hwm in the push/pull pattern of zmq?

查看:42
本文介绍了如何在 zmq 的推/拉模式中设置 hwm?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发现了一个类似的问题,ZeroMQ: HWM on PUSH 不起作用,但它无法解决我的问题.

I have found a similar question, ZeroMQ: HWM on PUSH does not work, but it couldn't solve my problem.

我想控制推送套接字排队的消息数量,但它不起作用并且仍然排队 1000 条消息.
所以我想知道如何设置push socket的hwm.提前致谢.

I want to control the number of messages that the push socket queues, but it doesn't work and still queues 1000 messages.
So I want to know how to set the hwm of the push socket. Thanks in advance.

我的环境是:libzmq 4.0.4、pyzmq 14.1.0、python 3.3

My environment is: libzmq 4.0.4, pyzmq 14.1.0, python 3.3

这是我的代码:

服务器.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random

import zmq


class TestPush(object):

    def __init__(self):
        self.ctx = zmq.Context()

        random.seed()

    def run(self):
        task_snd = self.ctx.socket(zmq.PUSH)
        task_snd.setsockopt(zmq.SNDHWM, 10)
        task_snd.bind('tcp://*:53000')        

        while True:
            workload = str(random.randint(1, 100))
            task_snd.send(workload.encode('utf-8'))
            print('Send {0}'.format(workload))


if __name__ == '__main__':
    test_push = TestPush()
    test_push.run()

客户端.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import random

import zmq


class TestPull(object):

    def __init__(self):
        self.ctx = zmq.Context()

    def run(self):
        task_rcv = self.ctx.socket(zmq.PULL)
        task_rcv.setsockopt(zmq.RCVHWM, 1)
        task_rcv.connect('tcp://localhost:53000')

        while True:
            msg = task_rcv.recv()
            print('Receive msg: {0}'.format(msg))

            time.sleep(random.randint(2, 3))


if __name__ == '__main__':
    test_pull = TestPull()
    test_pull.run()

推荐答案

当我尝试在推拉式套接字上设置 HWM(高水位线)时,我遇到了与 ZeroMQ 类似的问题.甚至,它也不适用于 pub 和 sub 套接字.我想解释一下发生了什么以及它是如何解决的.

I faced similar problem with ZeroMQ when I tried to set HWM (High Water Mark) on push and pull socket. Even, it was not working with pub and sub socket. I would like to explain what happened and how it got resolved.

我制作了 2 个脚本,第一个作为带有推式套接字的发送者,另一个作为带有拉式套接字的接收者.将两个套接字上的 HWM 设置为 10.在接收器脚本中,我在收到每条消息后延迟 5 秒.然后我用 100 条消息的循环运行发送者脚本(在保持接收器运行以接收之后).

I made 2 scripts, first as a sender with push socket and another as a receiver with pull socket. Set HWM as 10 on both the sockets. And inside receiver script, I put a delay of 5 seconds after each message received. Then I ran the sender script with loop of 100 messages (after keeping the receiver as running to receive).

接收方队列和发送方队列将到达 hwm.之后,发件人将停止发送更多消息.

The receiver queue and then the sender's queue will reach the hwm. After that, the sender will stop sending more messages.

发件人发送了所有 100 条消息并退出.但是接收者一直在一个接一个地处理消息,直到它接收到所有消息.

The sender sends all the 100 messages and exited. But the receiver has kept processing message one-after for long time till it receives all the messages.

有一种叫做内核套接字缓冲区的东西,它位于发送方套接字和接收方套接字之间.每当进程打开套接字时,内核都会为 tcp 套接字分配内存空间,默认为 128KB.内核套接字缓冲区适用于发送方和接收方套接字(因此总缓冲区将为 128KB + 128KB).我的消息大小以字节为单位(带有一些字符的 int).因此,总的消息缓冲如下:

There is something called as Kernel Socket Buffer which seats between both the sender socket and the receiver sockets. Whenever, a process opens a socket, the kernel allots memory space to the tcp sockets, which is by default 128KB. The kernel socket buffer is applicable to the sender and receiver socket (so total buffer will be 128KB + 128KB). My message size was in bytes (an int with some characters). Thus, the total message buffering will be as follows:

总缓冲区消息 =发件人套接字 hwm+ 发送者套接字的内核套接字缓冲区 (128KB)+ 接收器插座 hwm+ 发送方套接字的内核套接字缓冲区 (128KB)

Total buffer message = sender socket hwm + kernel socket buffer for the sender socket (128KB) + receiver socket hwm + kernel socket buffer for the sender socket (128KB)

现在,我将消息长度更改为略大于 1KB.然后再次执行测试,发现发送了大约 260 条消息(正如预期的那样),此后发送方在间隔中停止,直到接收方收到一些消息并重新开始.

Now, I changed my message length to little more than a 1KB. Then perform the test again and found approx 260 messages sent (as expected), after this the sender stops in interval till the receiver receives some message and starts again.

为了使push socket即使在接收者无法接收的情况下也能继续发送消息,我们可以在发送例程中使用NOBLOCK选项,但是接收者丢失的消息数量会增加得非常高.因此,更好的选择是使用 Poll 或 timeout,然后使用 NOBLOCK 选项调用发送例程.

In order for push socket to keep sending messages even when receiver is not able receive, we can use NOBLOCK option in send routine, but then the number of lost messages by receiver will increase very high. So, better option is to use Poll or timeout and then call send routine with NOBLOCK option.

请注意,您可以使用 zeromq 的 SNDBUFF/RCVBUFF,但操作系统可能不遵守它(就我而言,它不起作用).

Please note that you may use SNDBUFF/RCVBUFF of zeromq but OS may not obey it (as in my case it was not working).

这篇关于如何在 zmq 的推/拉模式中设置 hwm?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆