system.reactive相关内容
我正在编写一个批处理管道,它每Y秒处理X个未完成的操作。我觉得System.Reactive很适合这一点,但我无法让订阅者并行执行。我的代码如下所示: var subject = new Subject(); var concurrentCount = 0; using var reader = subject .Buffer(TimeSpan.FromSeconds(
..
我注意到RxMerge运算符接受一个可选的maxConcurrent参数。这可用于通过并发订阅有限数量的子序列来限制最大并发性。当推送新的子序列的速度慢于订阅的子序列的完成速度时,它工作得很好,但当推送新的子序列的速度快于此速度时,它就会变得有问题。发生的情况是,子序列被缓冲在大小不断增加的内部缓冲区中,并且当前订阅的子序列也变得越来越陈旧。以下是此问题的演示: await Observab
..
我的理解是async void,should be avoided,async () => 和Action一起使用时,只是变相的async void。 因此,应避免使用the Rx.NET Finally operator与async () => 异步,因为最终接受Action作为参数: IObservable.Finally(async () => { await So
..
我有一个IObservable序列,它在前9次订阅时发出单个项目,在进一步订阅时不发出任何内容并立即完成: int counter = 0; IObservable source = Observable.Defer(() => { if (++counter
..
我正在寻找签名类似于以下内容的可观察选择符: static IObservable TakeLatest(this IObservable input, TimeSpan interval) 哪一项应该: 一旦输入发出第一个项目,就立即发出第一个项目 从那时起,以固定的时间间隔发出输入产生的最新项 每当输入完成(或失败)时完成(或失败) 以大理石为单位,如下所
..
最近我意识到RxFinally运算符的行为方式至少对我来说是意想不到的。我的预期是,finallyAction抛出的任何错误都将传播到操作员的观察者下游。遗憾的是,情况并非如此。实际上,操作符首先将Antecedent序列的完成(或失败)传播给它的观察者,然后然后在无法传播该操作抛出的潜在错误的时间点调用action。因此,它在ThreadPool上抛出错误,并使进程崩溃。这不仅出乎意料,而且非常
..
我觉得我在试着重新发明轮子,所以我最好问问。 已给定 我有一个Observable source 和Task LoadAsync(T value)方法 时间 我使用选择/切换模式在源发出时调用LoadMethod observable .Select(value => Observable.FromAsync(cancellationToken =
..
我正在尝试创建一个rx.net运算符,该运算符接受Observable和: 如果第一个元素"a" ,则转发每个元素时保持不变 仅发出完成信号,否则 例如: -a-b-c-d-|- --> -a-b-c-d-|- -b-c-d-|- --> -|- 如何执行此操作? 推荐答案 以下版本完全没有竞争条件: public static I
..
据我所知,订阅方法应该是异步的,而运行方法应该是同步的。但这段代码是以同步方式工作的。有人能修好吗? using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reactive.Linq; namespace RxExtensionsDemo {
..
我对反应式编程的概念相当陌生。我使用的是Bonsai,它通过c#公开了一些但不是所有的.Net RX命令。 我正在尝试获得如下大理石图所示的行为: input1: ---1--------2--------3--------4--------5--------6--------7 input2: -------abc----------------------------------d
..
我们有一个实现IObservable的类Thing。在另一个类中,有一个Thing的集合,该类需要以统一的方式对来自所有这些可观测对象的更新做出反应。最明显的方法是Observable.Merge(),这通常是有效的;然而,当集合发生变化时,我们还需要订阅合并订阅中的任何新的Thing(理论上还需要取消订阅所有已删除的订阅,但这似乎没什么问题--它们只是不再生成任何更新)。 我
..
在各种情况下,我都希望RxReplay操作符能够缓冲传入的通知,在第一次订阅时同步重放其缓冲区,并在第一次订阅之后停止缓冲。这个轻量级Replay运算符应该只能为一个订阅者提供服务。可以找到这样一个操作符的一个用例here,在第一次订阅之后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例: var observable = Observable
..
合成类似ReplaySubject但仅向第一个订阅者(当该订阅者连接时)发出一次累积序列的Rx可观测对象的优雅方法是什么?第一次订阅后,它应该与常规Subject一样。 这是.NET项目的答案,但我同样希望得到JavaScript/RxJS的回答。 我在谷歌上寻找了潜在的解决方案,最终我将推出我自己的解决方案,类似于how I approachedDistinctSubject。
..
在函数式反应式编程的上下文中,“故障”的定义是什么? 我知道在某些FRP框架中可能会出现“故障”,而在其他框架中则不会。例如,RX并非没有毛刺,而ReactFX也不是没有毛刺[1]。 谁能给出一个非常简单的示例,演示在使用RX时如何以及何时会出现故障,并在同一示例中显示相应的ReactFX解决方案如何以及为什么没有故障。 感谢阅读。 推荐答案 定义 我(自己)最
..
是否可以对实现IAsyncDisposable而不是IDisposable的资源使用rx.net中的Using操作符?如果没有,是否有我可以使用的解决方法? 推荐答案 这里有一个Using方法,可以处理IAsyncDisposable对象: /// /// Constructs an observable sequence that depends on a
..
rx.net中是否有Subject实现在功能上类似于BehaviorSubject,但仅在其发生更改时才发出下一个值? 我对反应式扩展相当陌生,似乎找不到任何类似的东西,尽管此模式感觉像是INotifyPropertyChanged的自然替代品。 我的简单实现是封装BehaviorSubject,如下所示。与使用Observable.DistinctUntilChanged创建可组合
..
我要为定义如下的事件创建可观测对象: public event Func Closed; 我当前的代码是这样的: Observable.FromEvent, Unit>(h => hub.Closed += h, h=> hub.Closed -= h); 编译正常,但引发此运行时异常: Sy
..
我想限制事件的速度,如何在不使用 Microsoft Rx 框架的情况下实现这一点.我在 Rx 的帮助下完成了这项工作.但我正在尝试的是,我需要根据时隙限制 Map 的 View changed 事件.是否可以在不使用 Rx 的情况下实现相同的功能. 我不允许使用 Rx,我必须保持二进制大小尽可能小. 解决方案 例如,如果您的事件是 EventHandler 类型,则此方法有效.它为
..
我对 Rx 有点陌生,所以如果这看起来很傻或很明显,请原谅... 我有一个应用程序,它在某个时间扫描选定的文件夹并递归检索所有文件,之后它需要将它们存储在数据库中.我想在该过程中显示一个进度条,同时保持 UI 响应当然.取消按钮在稍后阶段也会很好. 我已经使用 Rx 实现了这一点,如下所示: //获取所有文件的列表var enumeratedFiles = Directory.Enu
..
我正在使用 RX 扩展和 WF4 创建一个工作流,该工作流对可观察到的消息做出反应以推进工作流.为此,我引入了一个包含 IObservable 的对象(ModuleMessage 是我的抽象类).我遇到的问题是 .Subscribe 无法识别其任何扩展方法,即用于 lambda 扩展/方法组的扩展方法.在下面的代码中,我有引用: 使用 System.Activity;使用 System.Acti
..