PUB-SUB 的 ZMQ 延迟(慢速订阅者) [英] ZMQ latency with PUB-SUB (slow subscriber)

查看:103
本文介绍了PUB-SUB 的 ZMQ 延迟(慢速订阅者)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发现了很多关于类似主题的问题,但它们并没有帮助我解决我的问题.

I have found a lot of question on a similar topic but they didn't help me to solve my problem.

使用:

  • Linux Ubuntu 14.04
  • python 3.4
  • zmq : 4.0.4//pyZMQ 14.3.1

即使设置了 HWM,ZMQ SUB 套接字中的接收器队列仍在无限增长.当订阅者比发布者慢时会发生这种情况.我能做些什么来防止它?

Receiver queue in ZMQ SUB socket is growing indefinitely even after HWM are set. This happen when subscriber is slower than publisher. What can I do to prevent it ?

我在人机交互领域工作.我们有一个庞大的代码库来控制鼠标光标,这类事情.我想在几个模块中打破它",与 ZMQ 通信.它必须具有尽可能小的延迟,但丢弃(丢失)消息并不那么重要.

I work in the human computer interaction filed. We have a huge code base to control the mouse cursor, this kind of things. I wanted to "break it" in several module, communicating with ZMQ. It must have as little latency as possible, but dropping (losing) messages is not that important.

另一个有趣的方面是可以在节点之间添加间谍".因此,PUB/SUB 插座似乎是最合适的.

An other interesting aspect is the possibility to add "spies" between the nodes. Thus the PUB/SUB sockets seems to be the most adequate.

像这样:

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+       

问题

一切正常,除非我们添加了间谍.如果我们添加一个间谍来做繁重的事情",比如使用 matplotlib 进行实时可视化,我们会注意到绘图中的延迟增加.IE : 在上图中,过滤器和输出速度很快,没有看到延迟,但在 Spy 2 上,运行 20 分钟后延迟可以达到 10 分钟 (!!)

Problem

Everything works fine, except when we add the spies. If we add a spy doing "heavy stuff" like real time visualisations with matplotlib we notice an increasing latency in the plots. IE : on the graph above, filter and output are fast, no latency is seen, but on Spy 2, latency can reach 10 min after running 20 min (!!)

看起来接收器上的队列无限增长.我们调查了 ZMQ 的高水位线 (HWM) 功能,将其设置为低以丢弃旧消息,但没有任何改变.

It looks like the queue on the receiver grows indefinitely. We investigated the High Water Mark (HWM) functionalities of ZMQ to set it low to drop older messages, but nothing changed.

+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

接收器是一个慢速接收器(在第一张图中充当间谍)

The receiver is a slow receiver (acting as a spy in the first graph)

发件人.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

receiver.py:

receiver.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

我不认为这是 ZMQ Pub/Sub 的正常行为.我试图在接收器、订阅者和两者中设置 HWM,但没有任何改变.

I don't think that this is a normal behaviour for ZMQ Pub/Sub. I tried to set the HWM in the receiver, in the subscriber, in both, but nothing changed.

当我解释我的问题时,我认为我没有说清楚.我做了一个移动鼠标光标的实现.输入是在 ZMQ 中以 200Hz 发送的鼠标光标位置(带有 .sleep( 1.0/200 ) ),完成了一些处理并更新了鼠标光标位置(我没有这个睡眠在我最小的例子中).

I don't think I was clear when I explained my problem. I made an implementation moving the mouse cursor. The input was the mouse cursor position send in ZMQ at 200Hz (with a .sleep( 1.0 / 200 ) ), some processing was done and the mouse cursor position was updated (I don't have this sleep in my minimal example).

一切都很顺利,即使在我发射间谍时也是如此.尽管如此,间谍的延迟越来越大(因为处理速度缓慢).延迟不会出现在光标中,在管道"的末尾.

Everything was smooth, even when I launched the spies. The spies nevertheless had a growing latency (because of the slow processing). The latency doesn't appear in the cursor, at the end of the "pipeline".

我认为问题来自缓慢的订阅者排队消息.

I think the problem comes from the slow subscriber queuing the messages.

在我的例子中,如果我们杀死发送者并让接收者活着,消息将继续显示,直到显示所有(?)提交的消息.

In my example, if we kill the sender and let the receiver alive, messages will continue to be displayed until all (?) the submitted messages are displayed.

spy 正在绘制光标的位置提供一些反馈,有这样的延迟仍然很不方便......我只是想得到最后发送的消息,这就是我试图降低HWM的原因.

The spy is plotting the position of the cursor to provide some feedback, it is still very inconvenient to have such a lag... I just want to get the last message sent, this is why I tried to lower the HWM.

推荐答案

缺少更好的实时设计/验证

ZeroMQ 是一个强大的消息传递层.

A better real-time design / validation is missing

ZeroMQ is a powerful messaging layer.

也就是说,检查它在原始while True:killer-loop

That said, check how many messages it really sends per second in the original while True: killer-loop

测量.设计基于事实,而不是感觉.

Measure it. Design on facts, not on feelings.

start_CLK = time.time()                                    # .SET _CLK
time.sleep( 0.001)                                         # .NOP avoid DIV/0!
i = 0                                                      # .SET CTR
while True:                                                # .LOOP
    sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
    print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
    i+= 1                                                  # .INC CTR

ZeroMQ 尽最大努力填充该方案的雪崩.

ZeroMQ does its best to populate that avalanche down the scheme.

它非常擅长这一点.

你的 [Filter] + [Spy1] + [输出code>] + [Spy2] 管道处理,端到端,有其中之一

  • 更快,包括.send() + .recv_string() 开销都比 [Input]-sender

  • 成为主要的阻塞病态元素,导致内部 PUB/SUB 队列增长、增长、增长

这个队列链问题可以通过另一种架构设计来解决.

This chain-of-queues problem can be solved by another architecture design.

需要重新思考的事情:

  1. sub-sample [Filter].send() cadency(交错因子取决于实际的稳定性问题-时间过程在您的控制下——无论是 1 毫秒(顺便说一句,O/S 计时​​器分辨率,因此使用 COTS O/S 计时​​器控件无法进行量子物理学实验:o)),双向语音流需要 10 毫秒,50TV/GUI 流为毫秒,键盘事件流等为 300 毫秒)

  1. sub-sample the [Filter].send() cadency ( interleave factor is dependent on stability issues of the real-time process under your control -- be it 1 msec ( btw an O/S timer resolution, so no quantum-physics experiments are possible with COTS O/S timer controls :o) ), 10 msec for bidirectional voice-streaming, 50 msec for TV/GUI streaming, 300 msec for keyboard event-stream et al )

online v/s offline 后处理/可视化(你注意到一个沉重的 matplotlib 处理,你通常承受大约 800 - 1600 - 3600 毫秒的开销,即使是在简单的 2D 图形上 - 测量它,然后再决定更改 PUB/SUB-proc1>-PUB/SUB-<proc2> 处理架构(您已经注意到,<spy2> 会导致 <proc2>-PUB-feeding & 发送开销).

online v/s offline post-processing / visualisation ( you noticed a heavy matplotlib processing, there you typically bear about 800 - 1600 - 3600 msec overheads, even on simple 2D graphing -- measure it before deciding about a change in PUB/SUB-<proc1>-PUB/SUB-<proc2> processing architecture ( you already noticed, that <spy2> cause problems in growing <proc2>-PUB-feeding & sending overheads ).

执行它们的线程数与本地主机内核数-- 从 localhost ip 可以看出,所有进程都驻留在同一个 localhost 上.加上为每个使用的 ZMQ.Context 添加 + 一个线程,加上检查 Python GIL 锁定开销,如果所有线程都已实例化来自同一个 Python 解释器......阻塞增长.阻塞很痛.更好的分布式架构可以提高这些性能方面.不过,先复习一下[1]和[2]

number of threads vs. number of localhost cores, that execute them -- as seen from the localhost ip, all the processes reside on the same localhost. Plus add +one thread per ZMQ.Context used, plus review Python GIL locking overhead, if all threads were instantiated from the same Python interpreter... Blocking grows. Blocking hurts. A better distributed architecture can improve these performance aspects. However, review [1] and [2] first

n.b. 调用 20 分钟的处理管道延迟(实时系统 TimeDOMAIN skew)延迟是委婉的很多

n.b. calling a 20 minutes processing pipeline delay ( a real-time system TimeDOMAIN skew ) a latency is a lot euphemistic

这篇关于PUB-SUB 的 ZMQ 延迟(慢速订阅者)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆