如何在python中限制ZMQ(ZeroMQ - PyZMQ)队列缓冲区大小? [英] How to have limited ZMQ (ZeroMQ - PyZMQ) queue buffer size in python?

查看:128
本文介绍了如何在python中限制ZMQ(ZeroMQ - PyZMQ)队列缓冲区大小?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用具有 pub/sub 模式的 pyzmq 库.我通过 .connect() 方法一些快速的 ZMQ 发布者.bind()一个较慢的 ZMQ 订阅者方法.然后几分钟后,我的订阅者从发布者那里得到旧的发布数据(由于 ZMQ 缓冲区).


我的问题:

有没有办法管理ZMQ队列缓冲区大小?(设置一个有限的缓冲区)

[注意]:

  • 我不想使用 ZMQ 推/拉.
  • 我读过这篇文章,但这种方法只清除缓冲区:清除ZMQ缓冲区
  • 我也尝试了 high watermark 选项,但没有用:
<块引用>

socket.setsockopt(zmq.RCVHWM, 10) # 不工作socket.setsockopt(zmq.SNDHWM, 10) # 不工作


出版商:

导入zmq导入时间端口 = 5556"上下文 = zmq.Context()socket = context.socket(zmq.PUB)socket.bind("tcp://*:%s"% 端口)socket.setsockopt(zmq.SNDHWM, 10) # 不工作为真:数据 = 时间.时间()打印(%d"% 数据)socket.send(%d"%数据)时间.睡眠(1)

订阅者:

导入zmq导入时间端口 = 5556"上下文 = zmq.Context()socket = context.socket(zmq.SUB)socket.connect("tcp://localhost:%s"% 端口)socket.setsockopt(zmq.SUBSCRIBE, '')socket.setsockopt(zmq.RCVHWM, 10) # 不工作而 1:time.sleep(2) # 减速器之类的.数据 = socket.recv()打印(数据)

即使使用这些选项,队列大小仍然超过 10(配置了发送/接收高水印).

解决方案

我找到了一种获取仅最后一条消息"的方法ZMQ 订阅套接字中的选项(使用 CONFLATE 选项).

但是首先您应该在连接之前设置CONFLATE选项:

导入zmq导入时间端口 = 5556"上下文 = zmq.Context()socket = context.socket(zmq.SUB)socket.setsockopt(zmq.SUBSCRIBE, '')socket.setsockopt(zmq.CONFLATE, 1) # 仅最后一条消息.socket.connect("tcp://localhost:%s" % port) # 必须放在上述选项之后.而 1:time.sleep(2) # 虚拟延迟数据 = socket.recv()打印(数据)

换句话说,我删除了订阅者代码中的所有缓冲队列.


[注意]:

此外,使用 zmq.SNDBUFzmq.RCVBUF 选项,我们可以设置 ZMQ 缓冲区大小的限制.(更完整和一个例子)


I'm using pyzmq library with pub/sub pattern. I have some quick ZMQ publisher by .connect() method and a slower ZMQ subscriber by .bind() method. Then after a few minutes, my subscriber gets old published data from the publisher (due to ZMQ buffer).


My Question:

Is there any approach to manage ZMQ queue buffer size? (set a limited buffer)

[NOTE]:

  • I don't want to use ZMQ PUSH/PULL.
  • I've read this post, but this approach clear buffer only: clear ZMQ buffer
  • I tried with high watermark options too, but it didn't work:

socket.setsockopt(zmq.RCVHWM, 10)  # not working
socket.setsockopt(zmq.SNDHWM, 10)  # not working


Publisher:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10)  # not working

while True:
    data = time.time()
    print("%d" % data)
    socket.send("%d" % data)
    time.sleep(1)

Subscriber:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10)  # not working

while 1:
    time.sleep(2)  # A speed reducer like.
    data = socket.recv()
    print(data)

Even with these options, the queue size is more than 10 yet (with configured send/receive high watermark).

解决方案

I found a way to get "Last message only" option in ZMQ Subscribe socket (using CONFLATE option).

But first you should set the CONFLATE option before you connect:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while 1:
    time.sleep(2)  # Dummy delay
    data = socket.recv()
    print(data)

On the other word, I removed any buffered queue in subscriber code.


[NOTE]:

In addition, with the zmq.SNDBUF and zmq.RCVBUF options we could set a limit on ZMQ buffer size. (More complete and an example)


这篇关于如何在python中限制ZMQ(ZeroMQ - PyZMQ)队列缓冲区大小?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆