当两者同时改变时合并可观察对象 [英] Combining Observables when both change simultaneously

查看:115
本文介绍了当两者同时改变时合并可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用RxPY将ReactiveX集成到我的GUI中.这是一个常见的ReactiveX问题.

I am trying to integrate ReactiveX into my GUI using RxPY. This is a general ReactiveX question.

说我有一个可视化,依赖于使用combine_latest(stream1, stream2, plot_function)的多个Observable流.当一个Observable更改时(例如用户修改值时),该方法非常有用.可视化使用新值进行更新.但是,有时两个Observable会同时更新,例如当用户从单个文件加载两个流的数据时.从技术上讲,一个Observable将在另一个Observable之前更新(以导入功能中的第一个为准).结果,该图将被更新两次,但是出于所有意图和目的,它只需要更新一次.

Say I have a visualization that depends on multiple Observable streams using combine_latest(stream1, stream2, plot_function). This works great when one Observable changes, like when the user modifies a value; the visualization updates using the new values. However, sometimes both Observables are updated simultaneously, like when the user loads data for both streams from a single file. Technically, one Observable will be updated before the other (whichever comes first in the import function). As a result, the plot will be updated twice, but for all intents and purposes it need only be updated once.

我拥有的某些可视化文件的计算成本很高,因此我想确保如果两个流同时更新 ,那么合并后的流只会发出一个值.我可以想到一些选择:

Some of the visualizations I have are expensive to compute, so I want to make sure that if both streams are updated simultaneously, then the combined stream only emits one value. I can think of a few options:

  1. 对合并的流使用debounce(),超时时间短(例如50ms).这种方法对我来说似乎很肮脏.

  1. Use debounce() with a small timeout (like 50ms) on the combined stream. This approach seems dirty to me.

请勿直接使用combine_latest.将两个流包装在也具有某种updating标志的新对象中.如果将updating标志设置为True,则在将updating标志设置为False之前不发出任何东西.这种方法让人感觉有状态,并且破坏了流的可组合性.

Don't use combine_latest directly. Wrap the two streams in a new object that also has some sort of updating flag. If I set the updating flag to True, then don't emit anything until I set the updating flag to False. This approach feels to stateful, and it ruins the composability of the streams.

告诉所有可视化文件,直到所有流都更新后才更新.再次,这破坏了封装,因为可视化不应理会上游发生的事情.它应该只从合并后的流中接收新值并做出漂亮的图片.

Tell all visualizations not to update until all the streams are updated. Again, this breaks encapsulation because the visualization shouldn't care what is happening upstream. It should just receive the new values from the combined stream and make a pretty picture.

使可视化效果具有足够的细粒度,以至于一次流更新首先只会带来很小的性能损失.对于某些可视化,例如基于点和网格尺寸计算网格的可视化,这是不可能的.如果点的网格大小发生变化,则需要重新计算整个网格.

Make the visualizations fine-grained enough that one stream updating first only introduces a small performance penalty. This is impossible for some visualizations, like a visualization that computes a mesh based on points and a mesh size. If the points or the mesh size changes, then the whole mesh needs to be recomputed.

Rx中是否有一些工具可以同时"处理多个流的更新?我觉得我要的是魔术.

Is there some facility in Rx to handle "simultaneously" updating multiple streams? I feel like what I'm asking for is magic.

对于使用Rx编写GUI程序的任何人:除了通过流发送新值之外,我是否还应该使用一些更好的体系结构用于模型?

For anyone who has made GUI programs using Rx: is there some better architecture I should be using for models besides sending new values through streams?

如果这个问题不清楚,请在评论中告诉我,我将尝试举一个更具体的例子.

If this question is unclear, please tell me in a comment and I will try to make a more concrete example.

这是一个示例Python RxPY程序:

Here is a sample Python RxPY program:

import rx

stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)

rx.Observable\
  .combine_latest(stream1, stream2, lambda x, y: (x, y))\
  .subscribe(print)

stream1.on_next(3)
stream2.on_next(4)

此打印:

(1, 2)
(3, 2)
(3, 4)

我该如何同时更新stream1stream2的值,以便以下结果成为结果?

How could I update the values of stream1 and stream2 simultaneously so that the following becomes the result?

(1, 2)
(3, 4)

换句话说,如何修改combine_latest以便可以在下游告诉它嘿,请等一下我更新其他流,然后再发出下一个值"?

In other words, how could I modify combine_latest in such a way that I can tell it downstream "hey, wait a second while I update the other streams before you emit your next value"?

推荐答案

在C#的Rx中有效:

var throttled = source.Publish(hot => hot.Buffer(() => hot.Throttle(dueTime));

此处的dueTime值是.NET中的TimeSpan.它只是说您希望在其产生值之前不活动的时间范围是什么.这基本上吞噬了在一定时间范围内同时"产生的值.

The dueTime value here is a TimeSpan in .NET. It merely says what the window of time is that you want to have inactivity before a value it produced. This basically gobbles up values produced "simultaneously" within a margin of time.

在这种情况下,source将是您的.combine_latest(...)可观察到的.

The source in this case would be your .combine_latest(...) observable.

这篇关于当两者同时改变时合并可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆