system.reactive相关内容
IScheduler 接口提供 public static IDisposable Schedule(this IScheduler scheduler, Action action) 和 public static IDisposable ScheduleAsync(this IScheduler scheduler, Func> action) ScheduleAsync 的方法说明:
..
内置Merge的行为两个源都完成后,操作符才完成.我正在寻找这个运算符的一个变体,它产生一个在两个源 observable 中的 any 完成时完成的 observable.例如,如果第一个 observable 成功完成,稍后第二个 observable 完成并出现异常,我希望忽略此异常. 我想出了一个实现,将一个特殊的哨兵异常连接到两个可枚举项,然后合并的序列捕获并抑制这个异常.我想知道
..
我在我的程序中使用 Rx 并希望为 observable 创建订阅,该订阅在一分钟的时间间隔内需要 5 个第一个元素并忽略其他元素.例如, 序列:-1---2--3--4-5---6---7-8--------------间隔:|-------------------|-------------------|结果:|1---2---3--4-5-----|-7-8--------------|
..
我有类似的代码. IPruduceDemUpdates.Subscribe(update => DoUpdate(update)); 但我想做的就是这样. IPruduceDemUpdates.Subscribe(update => if(NoDoUpadteIsRunning) DoUpdate(update)); 所以它会忽略传入的更新,如果更新方法已经在运行.此外,它应该始终执行最后
..
我正在寻找一个实现 IObservable> and IList>. 就是这样,我希望能够写: var list = new MyReactiveList();var item = new new Subject();list.Subscribe(values => Console.WriteLine($"[{string.Join(", ", values)}]"));list.Add(
..
我希望 IObservable.Subscribe 是异步的:无论内部发生什么,我都希望恢复订阅. 但是 Retry 似乎有不同的语义: var retry = Observable.Throw(new Exception(), 0).Retry();Console.WriteLine("到这里");var subscription = retry.Subscribe();Console.
..
如果我有三个 observable,我该如何组合它们,这样当第一个发出新值时,我可以将它与其他两个发出的最新值组合起来.重要的是我们只产生组合的新值observables 当第一个 observable 有新值时,不管其他两个是否发出了新值. 谢谢. 解决方案 您可以这样做(请注意,如果源 2 和 3 尚未发出,您将获得它们的默认值,并且我允许您有选择地提供这些默认值): 公共静态
..
这对于有专业知识的人来说可能非常简单,但是我如何直接向给定的 observable 提供新数据,每当调用我的方法时? IObservable_myObservable;void ThingsCallMe(int someImportantNumber){//当前的伪代码试图用可以编译的东西替换?_myObservable.Add(someImportantNumber);}void ISpyO
..
我正在尝试使用 StartWith 方法将初始值注入 RX 流: public async Task) Stream(Instrument instrumentDetails){var initialPrice = await _svc.GetSomeInitialPrice();var 流 = _priceObserver.Stream.Where(o => o.Symbol == instr
..
FWIW - 在 就元数据寻求建议 我有一个包含配置数据的网络服务.我想定期调用它 Tok 以刷新使用它的应用程序中的配置数据.如果服务出错(超时、停机等),我想保留上一次调用中的数据,并在不同的时间间隔 Tnotok 后再次调用该服务.最后,我希望行为是可测试的. 因为管理时间序列和可测试性似乎是 Reactive Extensions 的一个强项,所以我开始使用一个由生成的序列提供
..
我得到了 KeyValuePair 的 observable: --(7|45.2)--(3|11.1)--(5|13.2)--(6|36.2)--(3|57.4) 我得到了一个在运行时定义的消费者列表.他们只对为单个键 (myKey) 生成的值感兴趣. 例如: 消费者 7 只对值 45.2 感兴趣. 消费者 3 只对值 11.1 和 57.4 感兴趣 消费者 1,只对 myK
..
如何使用 System.Reactive 实现下一个逻辑? IObservable 当至少有一个订阅者存在时,它会在计时器上产生新项目(整数),并且当新订阅者订阅时,它会为新订阅者重复最后 X 个项目.例子:令 X = 5; SubscriberA 订阅并且 Observable 发出 1, 2, 3, 4, 5, 6, 7 SubscriberB 订阅并获得 3、4、5、6、7(最
..
我不明白需要打开或关闭边界的 Buffer 运算符的重载.我所指的重载是: public static IObservable>缓冲区(这个 IObservable 源,FuncbufferClosingSelector)公共静态 IObservable>缓冲区(这个 IObservable 源,IObservable缓冲区边界)公共静态 IObservable>缓冲区(这个IObservabl
..
示例代码: public static IObservableObserveOrders(这个 IProxy 代理,IEqualityComparer distinctPerDayComparer){return Observable.FromEvent(ev => proxy.OrderUpdated += ev,ev => proxy.OrderUpdated -= ev).Distinc
..
我不理解Finally 方法.在这种情况下它不会触发. [测试方法]public void finallyHappensOnError(){bool finallyActionHappened = false;尝试{可观察的.Throw(new DivideByZeroException()).Finally(() => finallyActionHappened = true).订阅();}抓
..
我是 RX 的新手,开始了解一些概念……我在网上找不到的概念. 我在代码中使用了计时器: Observable.Timer(TimeSpan.FromSeconds(2), schedulerProvider.CurrentThread);//其中 schedulerProvider.CurrentThread 实际上是 Scheduler.CurrentThread 我知道 Timer
..
如我的原始问题所述(参见 将相互依赖的事件流与RX.Net) 我有一个 RX.net 事件流,只要没有触发某个其他事件,它就只会调用观察者的 OnNext 方法(基本上,只要系统连接,就可以处理更改-* 事件,断开连接时暂停并在系统重新连接后重新开始处理 Change-* 事件). 然而,虽然这对新事件很顺利,但我如何取消/向正在进行 .OnNext() 调用发出取消信号? 解决方案
..
我不理解Finally 方法.在这种情况下它不会触发. [测试方法]public void finallyHappensOnError(){bool finallyActionHappened = false;尝试{可观察的.Throw(new DivideByZeroException()).Finally(() => finallyActionHappened = true).订阅();}抓
..
我有以下代码尝试重播序列;但 PostWorkItem 总是为空;我不明白为什么.下面的代码给出了堆栈溢出错误. Task>WorkItems = DbUtil.GetWorkItems(new List() { WorkItemStatus.NEW, WorkItemStatus.PROCESSING });项目 = WorkItems.Result;重放 = Observable.Gene
..
我正在尝试组合两个值共享某个键的 observable. 每当第一个 observable 产生一个新值时,我想产生一个新值,结合第二个 observable 的最新值,哪个选择取决于第一个 observable 的最新值. 伪代码示例: var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => Tuple
..