RX - 组/批可观察序列的元素连发 [英] RX - Group/Batch bursts of elements in an observable sequence
问题描述
我有一个观察的序列。当插入的第一要素,我想定时器的时间跨度期间启动一个定时器和批量插入以后的元素。然后,计时器会不会再次,直到另一个元素插入的顺序开始
I have an observable sequence. When the first element is inserted, I would like to start a timer and batch subsequent inserted elements during the timespan of the timer. Then, the timer wouldn't start again until another element is inserted in the sequence.
因此,像这样:
--------|=====timespan====|---------------|=====timespan====|-------------->
1 2 3 4 5 6 7 8
会产生:
would produce:
[1,2,3,4,5], [6,7,8]
我试着用Observable.Buffer()和时间跨度但是从我的实验,我可以看到计时器只要我们订阅可观察序列的开始,是只要先前的定时器完成重新启动。
I tried with Observable.Buffer() and a timespan but from my experimentation, I can see that the timer is started as soon as we subscribe to the observable sequence and is restarted as soon as the previous timer is completed.
所以具有相同的序列与前面的例子,并使用缓冲液()与入库时间,我会像这样的:
So having the same sequence as the previous example and using the Buffer() with a timespan, I would have something like this:
|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
1 2 3 4 5 6 7 8
这将产生这样的:
which would produce this:
[1,2,3,4], [5], [6,7], [8]
下面是我测试过的缓冲这种行为:
Here is how I tested this behavior with the Buffer:
var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
Observable.Never<int>());
Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
.Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));
通过输出:
4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 7:01:25 PM => []
4/24/2015 7:01:29 PM => []
4/24/2015 7:01:33 PM => []
任何人有关于如何做到这一点的想法? !在此先感谢
Anyone has an idea on how to do this? Thanks in advance!
推荐答案
给这个一展身手:
var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
Observable.Timer(TimeSpan.FromSeconds(4)).Select(o => 3),
Observable.Never<int>());
Console.WriteLine("{0} => Started", DateTime.Now);
source
.GroupByUntil(x => 1, g => Observable.Timer(TimeSpan.FromSeconds(4)))
.Select(x => x.ToArray())
.Switch()
.Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));
我不得不改变你的测试代码的持续时间为第三计时器以确保价值的外分组定时器。
I had to change your test code duration for the third timer to make sure the value was outside of the grouped timer.
这篇关于RX - 组/批可观察序列的元素连发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!