system.reactive相关内容

如何在早期完成时合并两个可观察对象

内置Merge的行为两个源都完成后,操作符才完成.我正在寻找这个运算符的一个变体,它产生一个在两个源 observable 中的 any 完成时完成的 observable.例如,如果第一个 observable 成功完成,稍后第二个 observable 完成并出现异常,我希望忽略此异常. 我想出了一个实现,将一个特殊的哨兵异常连接到两个可枚举项,然后合并的序列捕获并抑制这个异常.我想知道 ..
发布时间:2021-09-04 18:39:21 C#/.NET

如果上次回调尚未完成,则忽略传入的流更新

我有类似的代码. IPruduceDemUpdates.Subscribe(update => DoUpdate(update)); 但我想做的就是这样. IPruduceDemUpdates.Subscribe(update => if(NoDoUpadteIsRunning) DoUpdate(update)); 所以它会忽略传入的更新,如果更新方法已经在运行.此外,它应该始终执行最后 ..
发布时间:2021-09-04 18:39:16 C#/.NET

订阅 Observable.Retry() 直到完成才返回?

我希望 IObservable.Subscribe 是异步的:无论内部发生什么,我都希望恢复订阅. 但是 Retry 似乎有不同的语义: var retry = Observable.Throw(new Exception(), 0).Retry();Console.WriteLine("到这里");var subscription = retry.Subscribe();Console. ..
发布时间:2021-09-04 18:39:10 其他开发

我如何组合三个可观察量,使得

如果我有三个 observable,我该如何组合它们,这样当第一个发出新值时,我可以将它与其他两个发出的最新值组合起来.重要的是我们只产生组合的新值observables 当第一个 observable 有新值时,不管其他两个是否发出了新值. 谢谢. 解决方案 您可以这样做(请注意,如果源 2 和 3 尚未发出,您将获得它们的默认值,并且我允许您有选择地提供这些默认值): 公共静态 ..
发布时间:2021-09-04 18:39:07 其他开发

每次方法调用都可观察到的新数据

这对于有专业知识的人来说可能非常简单,但是我如何直接向给定的 observable 提供新数据,每当调用我的方法时? IObservable_myObservable;void ThingsCallMe(int someImportantNumber){//当前的伪代码试图用可以编译的东西替换?_myObservable.Add(someImportantNumber);}void ISpyO ..
发布时间:2021-09-04 18:39:04 C#/.NET

将生成的事件序列创建为冷序列

FWIW - 在 就元数据寻求建议 我有一个包含配置数据的网络服务.我想定期调用它 Tok 以刷新使用它的应用程序中的配置数据.如果服务出错(超时、停机等),我想保留上一次调用中的数据,并在不同的时间间隔 Tnotok 后再次调用该服务.最后,我希望行为是可测试的. 因为管理时间序列和可测试性似乎是 Reactive Extensions 的一个强项,所以我开始使用一个由生成的序列提供 ..
发布时间:2021-09-04 18:38:59 C#/.NET

RX - 按密钥订阅复杂性的多消费者

我得到了 KeyValuePair 的 observable: --(7|45.2)--(3|11.1)--(5|13.2)--(6|36.2)--(3|57.4) 我得到了一个在运行时定义的消费者列表.他们只对为单个键 (myKey) 生成的值感兴趣. 例如: 消费者 7 只对值 45.2 感兴趣. 消费者 3 只对值 11.1 和 57.4 感兴趣 消费者 1,只对 myK ..
发布时间:2021-09-04 18:38:56 C#/.NET

使 System.Reactive 在新订阅时重复最后 X 项

如何使用 System.Reactive 实现下一个逻辑? IObservable 当至少有一个订阅者存在时,它会在计时器上产生新项目(整数),并且当新订阅者订阅时,它会为新订阅者重复最后 X 个项目.例子:令 X = 5; SubscriberA 订阅并且 Observable 发出 1, 2, 3, 4, 5, 6, 7 SubscriberB 订阅并获得 3、4、5、6、7(最 ..
发布时间:2021-09-04 18:38:53 C#/.NET

为什么不是 Observable.Finally 被调用?

我不理解Finally 方法.在这种情况下它不会触发. [测试方法]public void finallyHappensOnError(){bool finallyActionHappened = false;尝试{可观察的.Throw(new DivideByZeroException()).Finally(() => finallyActionHappened = true).订阅();}抓 ..
发布时间:2021-09-04 18:38:44 C#/.NET

取消 RX.Net Observer 正在进行的 OnNext 方法

如我的原始问题所述(参见 将相互依赖的事件流与RX.Net) 我有一个 RX.net 事件流,只要没有触发某个其他事件,它就只会调用观察者的 OnNext 方法(基本上,只要系统连接,就可以处理更改-* 事件,断开连接时暂停并在系统重新连接后重新开始处理 Change-* 事件). 然而,虽然这对新事件很顺利,但我如何取消/向正在进行 .OnNext() 调用发出取消信号? 解决方案 ..
发布时间:2021-09-04 18:38:39 C#/.NET

为什么没有 Observable.Finally 被调用?

我不理解Finally 方法.在这种情况下它不会触发. [测试方法]public void finallyHappensOnError(){bool finallyActionHappened = false;尝试{可观察的.Throw(new DivideByZeroException()).Finally(() => finallyActionHappened = true).订阅();}抓 ..
发布时间:2021-09-04 18:38:34 C#/.NET

Rx 中的历史调度器

我有以下代码尝试重播序列;但 PostWorkItem 总是为空;我不明白为什么.下面的代码给出了堆栈溢出错误. Task>WorkItems = DbUtil.GetWorkItems(new List() { WorkItemStatus.NEW, WorkItemStatus.PROCESSING });项目 = WorkItems.Result;重放 = Observable.Gene ..
发布时间:2021-09-04 18:38:31 其他开发

将一个 observable 与另一个 observable 的最新相结合

我正在尝试组合两个值共享某个键的 observable. 每当第一个 observable 产生一个新值时,我想产生一个新值,结合第二个 observable 的最新值,哪个选择取决于第一个 observable 的最新值. 伪代码示例: var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => Tuple ..
发布时间:2021-09-04 18:38:28 C#/.NET