ZeroMQ:PUSH 上的 HWM 不起作用 [英] ZeroMQ: HWM on PUSH does not work

查看:34
本文介绍了ZeroMQ:PUSH 上的 HWM 不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个服务器/客户端脚本,其中有一个服务器来释放任务,以及多个执行它的工作人员.问题是我的呼吸机有很多任务,它会在心跳中填满内存.我试图在绑定之前设置 HWM,但没有成功.只要工作人员连接,它就会继续发送消息,完全无视设置的 HWM.我还有一个记录已完成任务的接收器.

I am trying to write a server/client script with a server that vents the tasks, and multiple workers that execute it. The problem is that my ventilator has so many tasks that it would fill up the memory in a heartbeat. I tried to set the HWM before it binds, but with no success. It just keeps on sending messages as soon as a worker connects, completely disregarding the HWM that was set. I also have a sink that keeps record of the tasks that were done.

server.py

import zmq

def ventilate():
    context = zmq.Context()

    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
    sender.bind("tcp://*:5557")


    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")

    print "Sending tasks to workers…"

    # The first message is "0" and signals start of batch
    sink.send('0')
    print "Sent starting signal"

    while True:
        sender.send("Message")



if __name__=="__main__":
    ventilate()

worker.py

import zmq
from multiprocessing import Process

def work():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    # Process t asks forever
    while True:
        msg = receiver.recv_msg()
        print "Doing sth with msg %s"%(msg)     
        sender.send("Message %s done"%(msg))

if __name__ == "__main__":
    for worker in range(10):        
        Process(target=work).start()

sink.py

import zmq

def sink():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")

    # Wait for start of batch
    s = receiver.recv()
    print "Received start signal"
    while True:
        msg = receiver.recv_msg()
        print msg


if __name__=="__main__":
    sink()

推荐答案

好吧,我玩了一下,我不认为问题出在 PUSH HWM 上,而是您无法为 PULL 设置 HWM.如果您查看 本文档,您会看到它说 N/对 HWM 采取的行动.

Ok, I had a play around, I don't think the issue is with the PUSH HWM, but rather that you can't set a HWM for PULL. If you look at this documentation, you can see there it says N/A for action on HWM.

PULL 套接字似乎每个都接收数百条消息(我确实尝试设置 HWM,以防万一它在 PULL 套接字上执行任何操作.它没有.).我通过改变呼吸机来发送带有递增整数的消息,并改变池中的每个工作人员在调用 recv() 之间等待 2 秒来证明这一点.工作人员打印出他们正在处理具有截然不同整数的消息.例如,一个工作人员将处理消息 10,而下一个工作人员处理消息 400.随着时间的推移,您会看到正在处理消息 10 的工作人员现在正在处理消息 11、12、13 等.而其他是处理401、402等

The PULL sockets seem to be taking hundreds of messages each (and I did try setting a HWM just in case it did anything on the PULL socket. It didn't.). I evidenced this by changing the ventilator to send messages with an incrementing integer, and changing each worker in the pool to wait 2 seconds between calls to recv(). The workers print out that they are processing messages with vastly different integers. For instance, one worker will be working on message 10, while the next is working on message 400. As time goes on, you see the worker who was processing message 10, is now processing message 11, 12, 13, etc. while the other is processing 401, 402, etc.

这向我表明 ZMQ_PULL 套接字正在某处缓冲消息.因此,虽然 ZMQ_PUSH 套接字确实具有 HWM,但 PULL 套接字正在快速请求消息,尽管它们实际上并未通过对 recv() 的调用进行访问.因此,如果连接了 PULL 套接字,则会导致 PUSH HWM 有效地被忽略.据我所知,您无法控制 PULL 套接字的缓冲区长度(我希望 RCVHWM 套接字选项可以控制它,但它似乎没有).

This indicates to me that the ZMQ_PULL socket is buffering the messages somewhere. So while the ZMQ_PUSH socket does have a HWM, the PULL socket is requesting messages quickly, despite them not actually being accessed by a call to recv(). So that results in the PUSH HWM effectively being ignored if a PULL socket is connected. As far as I can see, you can't control the length of the buffer of the PULL socket (I would expect the RCVHWM socket option to control this but it doesn't appear to).

这种行为当然引出了一个问题,ZMQ_PULL HWM 选项有什么意义,只有在您还可以控制接收套接字 HWM 时才有意义.

This behaviour of course begs the question what is the point of the ZMQ_PULL HWM option, which only makes sense to have if you can also control the receiving sockets HWM.

此时,我会开始询问 0MQ 人员您是否遗漏了一些明显的东西,或者如果这被认为是一个错误.

At this point, I'd start asking the 0MQ people whether you are missing something obvious, or if this is considered a bug.

抱歉,帮不上忙!

这篇关于ZeroMQ:PUSH 上的 HWM 不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆