system.reactive相关内容

系统中的并发订阅服务器执行。反应

我正在编写一个批处理管道,它每Y秒处理X个未完成的操作。我觉得System.Reactive很适合这一点,但我无法让订阅者并行执行。我的代码如下所示: var subject = new Subject(); var concurrentCount = 0; using var reader = subject .Buffer(TimeSpan.FromSeconds( ..
发布时间:2022-08-14 20:22:39 C#/.NET

如何合并并发和缓冲区容量有限的嵌套可观测IObservable&;lt;IObservable&;lt;T&;gt;&;gt;?

我注意到RxMerge运算符接受一个可选的maxConcurrent参数。这可用于通过并发订阅有限数量的子序列来限制最大并发性。当推送新的子序列的速度慢于订阅的子序列的完成速度时,它工作得很好,但当推送新的子序列的速度快于此速度时,它就会变得有问题。发生的情况是,子序列被缓冲在大小不断增加的内部缓冲区中,并且当前订阅的子序列也变得越来越陈旧。以下是此问题的演示: await Observab ..
发布时间:2022-08-14 20:18:15 C#/.NET

可观察:在源完成之前按时间间隔获取最新值

我正在寻找签名类似于以下内容的可观察选择符: static IObservable TakeLatest(this IObservable input, TimeSpan interval) 哪一项应该: 一旦输入发出第一个项目,就立即发出第一个项目 从那时起,以固定的时间间隔发出输入产生的最新项 每当输入完成(或失败)时完成(或失败) 以大理石为单位,如下所 ..
发布时间:2022-07-18 09:10:37 C#/.NET

如何实现更好的最终处方运算符?

最近我意识到RxFinally运算符的行为方式至少对我来说是意想不到的。我的预期是,finallyAction抛出的任何错误都将传播到操作员的观察者下游。遗憾的是,情况并非如此。实际上,操作符首先将Antecedent序列的完成(或失败)传播给它的观察者,然后然后在无法传播该操作抛出的潜在错误的时间点调用action。因此,它在ThreadPool上抛出错误,并使进程崩溃。这不仅出乎意料,而且非常 ..
发布时间:2022-04-19 16:06:58 C#/.NET

为什么这段代码不是异步运行的?

据我所知,订阅方法应该是异步的,而运行方法应该是同步的。但这段代码是以同步方式工作的。有人能修好吗? using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive.Linq; namespace RxExtensionsDemo { ..
发布时间:2022-04-19 15:51:21 C#/.NET

.NET中的反应式Rx Zip队列

我对反应式编程的概念相当陌生。我使用的是Bonsai,它通过c#公开了一些但不是所有的.Net RX命令。 我正在尝试获得如下大理石图所示的行为: input1: ---1--------2--------3--------4--------5--------6--------7 input2: -------abc----------------------------------d ..
发布时间:2022-04-19 15:47:32 C#/.NET

合并不断变化的可观测数据集合

我们有一个实现IObservable的类Thing。在另一个类中,有一个Thing的集合,该类需要以统一的方式对来自所有这些可观测对象的更新做出反应。最明显的方法是Observable.Merge(),这通常是有效的;然而,当集合发生变化时,我们还需要订阅合并订阅中的任何新的Thing(理论上还需要取消订阅所有已删除的订阅,但这似乎没什么问题--它们只是不再生成任何更新)。 我 ..
发布时间:2022-04-19 15:43:33 其他开发

如何制作一个只能订阅一次的轻量级`Replay`运算符?

在各种情况下,我都希望RxReplay操作符能够缓冲传入的通知,在第一次订阅时同步重放其缓冲区,并在第一次订阅之后停止缓冲。这个轻量级Replay运算符应该只能为一个订阅者提供服务。可以找到这样一个操作符的一个用例here,在第一次订阅之后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例: var observable = Observable ..
发布时间:2022-04-19 15:38:35 C#/.NET

一个Rx可观测对象,将作为ReplaySubject,但仅针对第一个订阅者?

合成类似ReplaySubject但仅向第一个订阅者(当该订阅者连接时)发出一次累积序列的Rx可观测对象的优雅方法是什么?第一次订阅后,它应该与常规Subject一样。 这是.NET项目的答案,但我同样希望得到JavaScript/RxJS的回答。 我在谷歌上寻找了潜在的解决方案,最终我将推出我自己的解决方案,类似于how I approachedDistinctSubject。 ..
发布时间:2022-03-31 16:38:30 C#/.NET

术语:什么是函数式反应式编程中的故障/RX?

在函数式反应式编程的上下文中,“故障”的定义是什么? 我知道在某些FRP框架中可能会出现“故障”,而在其他框架中则不会。例如,RX并非没有毛刺,而ReactFX也不是没有毛刺[1]。 谁能给出一个非常简单的示例,演示在使用RX时如何以及何时会出现故障,并在同一示例中显示相应的ReactFX解决方案如何以及为什么没有故障。 感谢阅读。 推荐答案 定义 我(自己)最 ..

rx.net中是否有功能类似于BehaviorSubject的Subject实现,但仅在值发生更改时才发出?

rx.net中是否有Subject实现在功能上类似于BehaviorSubject,但仅在其发生更改时才发出下一个值? 我对反应式扩展相当陌生,似乎找不到任何类似的东西,尽管此模式感觉像是INotifyPropertyChanged的自然替代品。 我的简单实现是封装BehaviorSubject,如下所示。与使用Observable.DistinctUntilChanged创建可组合 ..
发布时间:2022-02-26 09:51:20 C#/.NET

如何在不使用 Rx 框架的情况下限制事件的速度

我想限制事件的速度,如何在不使用 Microsoft Rx 框架的情况下实现这一点.我在 Rx 的帮助下完成了这项工作.但我正在尝试的是,我需要根据时隙限制 Map 的 View changed 事件.是否可以在不使用 Rx 的情况下实现相同的功能. 我不允许使用 Rx,我必须保持二进制大小尽可能小. 解决方案 例如,如果您的事件是 EventHandler 类型,则此方法有效.它为 ..
发布时间:2022-01-21 13:57:22 C#/.NET

填充DataSet时更新WPF进度条,全部使用Rx

我对 Rx 有点陌生,所以如果这看起来很傻或很明显,请原谅... 我有一个应用程序,它在某个时间扫描选定的文件夹并递归检索所有文件,之后它需要将它们存储在数据库中.我想在该过程中显示一个进度条,同时保持 UI 响应当然.取消按钮在稍后阶段也会很好. 我已经使用 Rx 实现了这一点,如下所示: //获取所有文件的列表var enumeratedFiles = Directory.Enu ..
发布时间:2022-01-21 13:27:16 C#/.NET

可观察的<>缺少 .Subscribe 扩展方法

我正在使用 RX 扩展和 WF4 创建一个工作流,该工作流对可观察到的消息做出反应以推进工作流.为此,我引入了一个包含 IObservable 的对象(ModuleMessage 是我的抽象类).我遇到的问题是 .Subscribe 无法识别其任何扩展方法,即用于 lambda 扩展/方法组的扩展方法.在下面的代码中,我有引用: 使用 System.Activity;使用 System.Acti ..
发布时间:2022-01-20 09:12:46 其他开发