如何在python中限制ZMQ(ZeroMQ - PyZMQ)队列缓冲区大小? [英] How to have limited ZMQ (ZeroMQ - PyZMQ) queue buffer size in python?
问题描述
我正在使用具有 pub/sub 模式的 pyzmq
库.我通过 .connect()
方法一些快速的 ZMQ 发布者和 .bind()
的一个较慢的 ZMQ 订阅者方法.然后几分钟后,我的订阅者从发布者那里得到旧的发布数据(由于 ZMQ 缓冲区).
我的问题:
有没有办法管理ZMQ队列缓冲区大小?(设置一个有限的缓冲区)
[注意]:
- 我不想使用 ZMQ 推/拉.
- 我读过这篇文章,但这种方法只清除缓冲区:清除ZMQ缓冲区
- 我也尝试了
high watermark
选项,但没有用:
socket.setsockopt(zmq.RCVHWM, 10) # 不工作socket.setsockopt(zmq.SNDHWM, 10) # 不工作
出版商:
导入zmq导入时间端口 = 5556"上下文 = zmq.Context()socket = context.socket(zmq.PUB)socket.bind("tcp://*:%s"% 端口)socket.setsockopt(zmq.SNDHWM, 10) # 不工作为真:数据 = 时间.时间()打印(%d"% 数据)socket.send(%d"%数据)时间.睡眠(1)
订阅者:
导入zmq导入时间端口 = 5556"上下文 = zmq.Context()socket = context.socket(zmq.SUB)socket.connect("tcp://localhost:%s"% 端口)socket.setsockopt(zmq.SUBSCRIBE, '')socket.setsockopt(zmq.RCVHWM, 10) # 不工作而 1:time.sleep(2) # 减速器之类的.数据 = socket.recv()打印(数据)
即使使用这些选项,队列大小仍然超过 10(配置了发送/接收高水印
).
我找到了一种获取仅最后一条消息"的方法ZMQ
订阅套接字中的选项(使用 CONFLATE
选项).
但是首先您应该在连接之前设置CONFLATE
选项:
导入zmq导入时间端口 = 5556"上下文 = zmq.Context()socket = context.socket(zmq.SUB)socket.setsockopt(zmq.SUBSCRIBE, '')socket.setsockopt(zmq.CONFLATE, 1) # 仅最后一条消息.socket.connect("tcp://localhost:%s" % port) # 必须放在上述选项之后.而 1:time.sleep(2) # 虚拟延迟数据 = socket.recv()打印(数据)
换句话说,我删除了订阅者代码中的所有缓冲队列.
[注意]:
此外,使用 zmq.SNDBUF
和 zmq.RCVBUF
选项,我们可以设置 ZMQ 缓冲区大小的限制.(更完整和一个例子)
I'm using pyzmq
library with pub/sub pattern. I have some quick ZMQ publisher by .connect()
method and a slower ZMQ subscriber by .bind()
method.
Then after a few minutes, my subscriber gets old published data from the publisher (due to ZMQ buffer).
My Question:
Is there any approach to manage ZMQ queue buffer size? (set a limited buffer)
[NOTE]:
- I don't want to use ZMQ PUSH/PULL.
- I've read this post, but this approach clear buffer only: clear ZMQ buffer
- I tried with
high watermark
options too, but it didn't work:
socket.setsockopt(zmq.RCVHWM, 10) # not working socket.setsockopt(zmq.SNDHWM, 10) # not working
Publisher:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10) # not working
while True:
data = time.time()
print("%d" % data)
socket.send("%d" % data)
time.sleep(1)
Subscriber:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10) # not working
while 1:
time.sleep(2) # A speed reducer like.
data = socket.recv()
print(data)
Even with these options, the queue size is more than 10 yet (with configured send/receive high watermark
).
I found a way to get "Last message only" option in ZMQ
Subscribe socket (using CONFLATE
option).
But first you should set the CONFLATE
option before you connect:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while 1:
time.sleep(2) # Dummy delay
data = socket.recv()
print(data)
On the other word, I removed any buffered queue in subscriber code.
[NOTE]:
In addition, with the zmq.SNDBUF
and zmq.RCVBUF
options we could set a limit on ZMQ buffer size. (More complete and an example)
这篇关于如何在python中限制ZMQ(ZeroMQ - PyZMQ)队列缓冲区大小?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!