rx.net相关内容
我注意到RxMerge运算符接受一个可选的maxConcurrent参数。这可用于通过并发订阅有限数量的子序列来限制最大并发性。当推送新的子序列的速度慢于订阅的子序列的完成速度时,它工作得很好,但当推送新的子序列的速度快于此速度时,它就会变得有问题。发生的情况是,子序列被缓冲在大小不断增加的内部缓冲区中,并且当前订阅的子序列也变得越来越陈旧。以下是此问题的演示: await Observab
..
我有一个IObservable序列,它在前9次订阅时发出单个项目,在进一步订阅时不发出任何内容并立即完成: int counter = 0; IObservable source = Observable.Defer(() => { if (++counter
..
最近我意识到RxFinally运算符的行为方式至少对我来说是意想不到的。我的预期是,finallyAction抛出的任何错误都将传播到操作员的观察者下游。遗憾的是,情况并非如此。实际上,操作符首先将Antecedent序列的完成(或失败)传播给它的观察者,然后然后在无法传播该操作抛出的潜在错误的时间点调用action。因此,它在ThreadPool上抛出错误,并使进程崩溃。这不仅出乎意料,而且非常
..
我正在尝试创建一个rx.net运算符,该运算符接受Observable和: 如果第一个元素"a" ,则转发每个元素时保持不变 仅发出完成信号,否则 例如: -a-b-c-d-|- --> -a-b-c-d-|- -b-c-d-|- --> -|- 如何执行此操作? 推荐答案 以下版本完全没有竞争条件: public static I
..
在各种情况下,我都希望RxReplay操作符能够缓冲传入的通知,在第一次订阅时同步重放其缓冲区,并在第一次订阅之后停止缓冲。这个轻量级Replay运算符应该只能为一个订阅者提供服务。可以找到这样一个操作符的一个用例here,在第一次订阅之后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例: var observable = Observable
..
是否可以对实现IAsyncDisposable而不是IDisposable的资源使用rx.net中的Using操作符?如果没有,是否有我可以使用的解决方法? 推荐答案 这里有一个Using方法,可以处理IAsyncDisposable对象: /// /// Constructs an observable sequence that depends on a
..
我希望合并异步开始和结束的流(可观察对象): -1----1----1----1---|->-2----2---|->[ optional_zip(sum) ]-1----3----3----1---|-> 我需要它做什么:将音频流添加在一起.它们是音频“块"流,但我将在这里用整数表示它们.所以第一个剪辑正在播放: -1----1----1----1---|-> 然后第二个开始,稍后:
..
我想在 Rx 订阅中回调异步函数. 例如像这样: 公共类消费者{私有只读服务_服务=新服务();public ReplaySubject结果 = 新的 ReplaySubject();公共无效触发器(){Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());}公共任务
..
最近我遇到了一种情况,将异步操作同时表示为 Task 和 IObservable 将是有利的.任务表示维护操作的状态(IsCompleted、IsFaulted 等),而可观察的表示允许以有趣的方式组合多个操作(Concatcode>、Merge、Switch 等),自动处理取消任何沿途已取消订阅的操作,解决这种方式的fire-and-forgotten asynchronous问题操作.所以我对
..
我有一个可观察的元素序列,这些元素具有 char Key 属性,其值范围从 'A' 到 'E'.我想根据这个键对这些元素进行分组.对它们进行分组后,我希望通过一组可观察到的结果来显示结果,以便我可以分别处理每个组.我的问题是我找不到一种很好的方法来保存最终可观察对象中每个组的键.这是我正在尝试做的一个例子: var observable = Observable.Interval(TimeSpa
..
内置Merge的行为两个源都完成后,操作符才完成.我正在寻找这个运算符的一个变体,它产生一个在两个源 observable 中的 any 完成时完成的 observable.例如,如果第一个 observable 成功完成,稍后第二个 observable 完成并出现异常,我希望忽略此异常. 我想出了一个实现,将一个特殊的哨兵异常连接到两个可枚举项,然后合并的序列捕获并抑制这个异常.我想知道
..
如何使用 System.Reactive 实现下一个逻辑? IObservable 当至少有一个订阅者存在时,它会在计时器上产生新项目(整数),并且当新订阅者订阅时,它会为新订阅者重复最后 X 个项目.例子:令 X = 5; SubscriberA 订阅并且 Observable 发出 1, 2, 3, 4, 5, 6, 7 SubscriberB 订阅并获得 3、4、5、6、7(最
..
我注意到 Rx Merge 运算符接受一个可选的 maxConcurrent 参数.这可用于通过同时订阅有限数量的子序列来限制最大并发.当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但是当新子序列的推送速度比这个速度快时,它就会出现问题.发生的情况是子序列被缓冲在一个内部缓冲区中,其大小永远增加,而且当前订阅的子序列变得越来越老.下面是这个问题的演示: await Obser
..
您是否希望下面的程序打印 False? 使用系统;使用 System.Threading;使用 System.Reactive.Linq;使用 System.Reactive.Concurrency;公共静态类程序{public static void Main(){可观察的.返回(1).ObserveOn(ThreadPoolScheduler.Instance).Do(x => Consol
..
我正在尝试枚举一个大的IEnumerable 一次,观察附有各种运算符的枚举(Count、Sum、Average 等).显而易见的方法是使用 ToObservable,然后订阅一个观察者.我注意到这比其他方法慢得多,比如做一个简单的循环并在每次迭代时通知观察者,或者使用 Observable.Create 方法而不是 ToObservable.差异很大:它慢了 20-30 倍.就是这样,还是我做错
..
最近我偶然发现了一个 有趣的声明 Enigmativity 关于发布 和 RefCount 运算符: 您正在使用危险的 .Publish().RefCount() 运算符对,它创建了一个在完成后无法订阅的序列. 此声明似乎反对 Lee Campbell 对这些运营商的评估.引用他的书 Rx 介绍: Publish/RefCount 对对于获取冷 observable 并将其作为热
..
给定一个 IObservable 有没有一种方法可以使用 Throttle 行为(在添加项目时重置计时器,但让它返回所有在那段时间内添加的项目? Buffer 提供了类似的功能,它在每个时间跨度或计数上将数据分块到 IList 中.但是每次添加项目时我都需要时间来重置. 我在这里看到了一个类似的问题,反应式扩展是否支持滚动缓冲区?,但答案似乎并不理想,而且它有点旧,所以我想知道 Rx-
..
给定一个可观察的形式: var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(1));var stateFetcher =Observable.FromAsync(async () => await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));var del
..
在 Rx.NET 中,是否可以在按 key 分组的热门 observable 中采样最新项目? 例如,如果我有一个 IObservable,其中 Price 是: 价格- 钥匙- 出价- 提供 让我们假设 IObservable 链接到外部价格馈送. 我是否能够检索所有最新的 Price,按 Key 分组,使用 Rx 每 1 秒采样一次? 解决方案 假设有一些可观察的 so
..
我有以下代码 string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;_solverManagementService.MergedPointCloudProducer(数据目录,取消令牌).Subscribe(PointCloudMergerCompleted); 其中 SolverManagementServi
..