rx.net相关内容

如何将多个可观察变量与订单保存和最大并发合并?

我搜索了一个重复项,但没有找到任何重复项.我所拥有的是嵌套的可观察的 IObservable> ,我想将其展平为 IObservable .我不想使用 Merge 运算符,因为它会弄乱发射值的顺序.下面的大理石图显示了 Merge 运算符有问题的行为(就我而言),以及合意的合并行为. 可观察流:+-1 --- 2 --- 3 - ..
发布时间:2021-04-23 20:35:23 C#/.NET

如何将非一次性对象与每次冷观察对象绑定在一起?

很抱歉,以前是否有人问过这个问题,但我找不到重复的问题.也很抱歉最近提出了太多问题!我可能正在搜索自定义不是线程安全的. 我试图解决此问题的方法是使用 Using 运算符,该运算符具有 FuncresourceFactory 参数: 静态IObservableGetRandomNumbers(){返回Observable.Using(()=> new R ..
发布时间:2021-04-18 19:19:17 C#/.NET

如何清除ReplaySubject上的缓冲区?

如何清除 ReplaySubject 上的缓冲区? 我定期需要清除缓冲区(对于我来说,这是一天结束的事件),以防止 ReplaySubject 持续增长并最终耗尽所有内存. 理想情况下,我希望保持相同的 ReplaySubject ,因为客户端订阅仍然有效. 解决方案 ReplaySubject 没有提供清除缓冲区的方法,但是有一些重载以不同的方式限制其缓冲区: 保留项目 ..
发布时间:2021-04-18 18:44:55 C#/.NET

在单个并发执行限制下,使用Rx运行定期任务的好方法是什么?

我要运行定期任务,但要限制在任何给定时间最多只能运行一种方法的执行. 我正在尝试使用Rx,但是我不确定如何强加一次并发限制. var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));timer.Subscribe(tick => DoSomething()); 此外,如果任务仍在运行,我希望随后的时间表过去.即我 ..
发布时间:2021-04-18 18:36:02 C#/.NET

如何在Rx.Net中实现exhaustMap处理程序?

我正在从 rxjs 寻找类似于 exhaustMap 运算符的东西,但是 RX.NET 似乎没有这样的运算符. 我需要实现的是,在源流的每个元素上,我需要启动一个 async 处理程序,直到完成,我想从源代码中删除任何元素.处理程序完成后,立即恢复使用元素. 我不希望在每个元素上启动异步处理程序-在处理程序运行时,我要删除源元素. 我还怀疑我需要在这里巧妙地使用defer运算符吗 ..
发布时间:2021-04-01 19:47:36 C#/.NET

如何在rx.net中组合GroupedObservables?

我可以观察到,我使用GroupBy来获取许多流.我实际上想要每个子流上的Scan结果.假设可观察到的是产品价格之上,而扫描结果是每种产品类型的平均价格. 我还有另一个与那些“产品"有关的事件(比如说“显示产品价格"事件),我想将其与上一类产品的最新产品价格结合起来.因此,需要将每个组的Scan输出与事件流的每个元素结合起来,以获取该事件产品的最新平均价格. 由于某种原因,我无法获得正确 ..
发布时间:2020-07-06 04:45:33 其他开发

Rx.Net:在SelectMany中调用多个IObservable

请注意:这是问题较早发布,但感兴趣的解决方案却有不同的情况. 我试图多次调用各自返回IObservable的方法,但在SelectMany语句中返回的值是一个Task,因此下面的Subscribe语句无法编译. 这是代码段 var myWorkList = new List { new My ..
发布时间:2020-07-06 04:45:29 其他开发

Rx.Net:链接订户-替代方法?

如何重新编写此代码,这样我就不必像下面那样链接订户?询问的原因是,由于代码的样式,这种样式将限制在一个可观察的范围内,这取决于另一个可观察的范围,这可能会造成混淆. var results = myService .GetData(accountId) // returns IObservable .Subscribe(data => ..
发布时间:2020-07-06 04:45:25 其他开发

Observable.Repeat是不可阻挡的,它是错误还是功能?

我注意到 Repeat 运算符的行为有些奇怪源可观察者的通知是同步的.后续的 TakeWhile 运算符无法停止产生的可观测值,并且显然继续运行永远.为了演示,我创建了一个可观察的源,该源产生一个值,该值在每次订阅时都会递增.第一个订户获得值1,第二个订户获得值2等等: int incrementalValue = 0; var incremental = Observable.Create ..
发布时间:2020-07-06 04:45:22 C#/.NET

如何在rx.net中实现我自己的运算符

我需要RX中具有迟滞滤波器的功能.仅当先前发出的值和当前输入值相差一定量时,才应从源流中发出一个值.作为通用扩展方法,它可以具有以下签名: public static IObservable HysteresisFilter(this IObservable source, Func ..
发布时间:2020-07-06 04:45:14 C#/.NET

节气门,但如果来不及,则放弃结果

我正在编写一个UI,用户可以在其中输入搜索字词,并且列表会不断更新,并提供建议. 尽管我的第一个想法是Rx原语Throttle是完美的匹配,但是却让我半途而废. 获取这些建议需要一些时间,因此我可以在非UI线程上异步获取它们. 问题是,如果用户再次输入油门时间跨度,我想丢弃/跳过/丢弃结果. 例如: 时间开始并且用户按下一个键:0ms 油门设置为100ms. 提取 ..
发布时间:2020-07-06 04:45:13 C#/.NET

将最后一项推入“可观察的"(序列)

我在一个类中有一个IObservable,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一项.因此它将提供单个值Item. 如果未推送任何值,则必须返回默认值. 我如何做到这一点而不必订阅可观察的内容并拥有“后备字段"? 解决方案 仅在这里补充@Asti的答案,也许可以帮助您避免沮丧: 可观察的事物不是物理的“事物",而是更具逻辑性的概念.通 ..
发布时间:2020-07-06 04:44:11 C#/.NET

RX.Net:使用重试,但记录任何异常

我是RX的新手,一直在研究错误处理和Retry的使用;我有以下内容(是的,我知道这不是一个“真正的"单元测试,但是它给了我一些摆弄的地方!),想知道如何继续重试,但能够记录任何异常? [Test] public void Test() { var scheduler = new TestScheduler(); var source ..
发布时间:2020-07-06 04:44:07 其他开发

使用AsObservable观察TPL数据流块而无需消耗消息

我有一连串的TPL Dataflow块,想观察系统内部的进度. 我知道我可以将TransformBlock塞入要观察的网格中,将其发布到各种进度更新器中,然后将消息原封不动地返回到下一个块.我不喜欢这种解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑. 所以我想知道是否可以使用ISourceBlock.AsObservable观察消息在 ..
发布时间:2020-07-06 04:44:03 C#/.NET

利用反应性扩展(RX),可以添加“暂停"消息.命令?

我有一个可以接收事件流并推出另一事件流的类. 所有事件都使用反应性扩展(RX).使用.OnNext将事件的传入流从外部源推入IObserver,使用IObservable和.Subscribe将事件的传出流推出.我正在使用Subject在后台进行管理. 我想知道RX中有什么技术可以暂时暂停输出.这意味着传入事件将在内部队列中累积,并且在未暂停时,事件将再次流出. ..
发布时间:2020-07-06 04:43:55 C#/.NET

在Rx中实现滑动窗口的问题

我为响应式扩展创建了一个SlidingWindow运算符,因为我想轻松监视滚动平均值之类的事情.作为一个简单的示例,我想订阅以听取鼠标事件,但是每次有一个事件我都想接收最后三个(而不是等待每三个事件接收最后三个).这就是为什么我发现Window重载似乎无法为我提供开箱即用的东西的原因. 这是我想出的.考虑到它频繁的List操作,我担心它可能不是最有效的解决方案: public stat ..
发布时间:2020-07-06 04:43:51 C#/.NET