Pyzmq 高水位标记不适用于 pub socket [英] Pyzmq high-water mark not working on pub socket
问题描述
根据 ZeroMQ 文档,一旦排队的消息数量达到高水位线,pub 套接字应该丢弃消息.
According to the ZeroMQ documentation a pub socket is supposed to drop messages once the number of queued messages reaches the high-water mark.
这在以下示例中似乎不起作用(是的,我确实在绑定/连接之前设置了 hwm
):
This doesn't seem to work in the following example (and yes I do set the hwm
before bind/connect):
import time
import pickle
from threading import Thread
import zmq
ctx = zmq.Context()
def pub_thread():
pub = ctx.socket(zmq.PUB)
pub.set_hwm(2)
pub.bind('tcp://*:5555')
i = 0
while True:
# Send message every 100ms
time.sleep(0.1)
pub.send_string("test", zmq.SNDMORE)
pub.send_pyobj(i)
i += 1
def sub_thread():
sub = ctx.socket(zmq.SUB)
sub.subscribe("test")
sub.connect('tcp://localhost:5555')
while True:
# Receive messages only every second
time.sleep(1)
msg = sub.recv_multipart()
print("Sub: %d" % pickle.loads(msg[1]))
t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()
while True:
pass
我在 pub 上发送消息的速度比在子套接字上读取消息快 10 倍,hwm
设置为 2.我希望大约每 10 条消息就会收到一次.相反,我看到以下输出:
I'm sending messages on pub 10 times faster than reading them on the sub socket, hwm
is set to 2. I would expect to only receive about every 10th message. Instead, I see the following output:
Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...
所以我看到所有消息到达,因此它们被保留在某个队列中,直到我阅读它们.在连接之前在子套接字上添加 hwm=2 时也是如此.
so I see all messages arriving, thus they are held in some queue until I read them. Same holds true when adding a hwm=2 on the sub socket as well before connect.
我做错了什么还是我误解了hwm
?
What am I doing wrong or am I misunderstanding hwm
?
我使用 pyzmq 版本 17.1.2
推荐答案
借用对 我在 Github 中打开的问题,我更新了我的答案如下:
With borrowing an answer to the issue which I opened in Github, I've updated my answer as follows:
消息保存在操作系统的网络缓冲区中.我已经找到因此,HWM 不是那么有用.这是修改后的代码订阅者错过消息的地方:
Messages are held in operating system's network buffers. I have found HWMs to be not that useful because of that. Here is modified code where subscriber misses messages:
import time
import pickle
import zmq
from threading import Thread
import os
ctx = zmq.Context()
def pub_thread():
pub = ctx.socket(zmq.PUB)
pub.setsockopt(zmq.SNDHWM, 2)
pub.setsockopt(zmq.SNDBUF, 2*1024) # See: http://api.zeromq.org/4-2:zmq-setsockopt
pub.bind('tcp://*:5555')
i = 0
while True:
time.sleep(0.001)
pub.send_string(str(i), zmq.SNDMORE)
pub.send(os.urandom(1024))
i += 1
def sub_thread():
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'')
sub.setsockopt(zmq.RCVHWM, 2)
sub.setsockopt(zmq.RCVBUF, 2*1024)
sub.connect('tcp://localhost:5555')
while True:
time.sleep(0.1)
msg, _ = sub.recv_multipart()
print("Received:", msg.decode())
t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()
<小时>
输出看起来像这样:
Output looks something like this:
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270
<小时>
消息丢失,因为所有队列/缓冲区都已满并且发布者开始丢弃消息(参见 ZMQ_PUB 的文档:http://api.zeromq.org/4-2:zmq-socket).
<小时>
[注意]:
- 您应该在听众/订阅者和广告商/发布者中使用高水位标记.
- 这些帖子也很相关(Post1 - Post2)
sock.setsockopt(zmq.CONFLATE, 1)
是另一种仅获取订阅者端定义的最后一条消息的选项.
- You should use the high-water mark option in listener/subscriber and advertiser/publisher.
- These posts are also relevant (Post1 - Post2)
sock.setsockopt(zmq.CONFLATE, 1)
is another option to get the last message only which defined in subscriber side.
这篇关于Pyzmq 高水位标记不适用于 pub socket的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!