RX:如何处理序列中的 n 个缓冲项,然后在处理下 n 个项之前等待 t 秒? [英] RX: How to process n buffered items from a sequence then wait t seconds before processing the next n items?
本文介绍了RX:如何处理序列中的 n 个缓冲项,然后在处理下 n 个项之前等待 t 秒?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想弄清楚如何处理序列中的 n 个缓冲项,然后等待 t 秒再处理下 n 个项?
I'm trying to figure out how to process n buffered items from a sequence then wait t seconds before processing the next n items?
这是我尝试使用 Thread.Sleep() 的粗略形式.我想避免 Thread.Sleep() 并正确执行.
Here's a crude form of what I'm trying to do, using Thread.Sleep(). I want to avoid Thread.Sleep() and do it properly.
static void Main(string[] args)
{
var t = Observable.Range(0, 100000);
var query = t.Buffer(20);
query.ObserveOn(NewThreadScheduler.Default)
.Subscribe(x => DoStuff(x));
Console.WriteLine("Press ENTER to exit");
Console.ReadLine();
}
static void DoStuff(IList<int> list)
{
Console.WriteLine(DateTime.Now);
foreach (var value in list)
{
Console.WriteLine(value);
}
Thread.Sleep(TimeSpan.FromSeconds(10));
}
有人能帮我找到一种更 RX 的方式吗?
Can any one help me find a more RX way of doing this?
谢谢
闪光
推荐答案
// Instantiate this once, we'll use it in a closure multiple times.
var delay = Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(10));
// start with a source of individual items to be worked.
Observable.Range(0, 100000)
// Create batches of work.
.Buffer(20)
// Select an observable for the batch of work, and concat a delay.
.Select(batch => batch.ToObservable().Concat(delay))
// Concat those together and form a "process, delay, repeat" observable.
.Concat()
// Subscribe!
.Subscribe(Console.WriteLine);
// Make sure we wait for our work to be done.
// There are other ways to sync up, like async / await.
Console.ReadLine();
或者,您也可以使用 async/await 进行同步:
Alternatively, you could also sync up using async/await:
static IObservable<int> delay = Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(100));
static async Task Run()
{
await Observable.Range(0, 1000)
.Buffer(20)
.Select(batch => batch.ToObservable().Concat(delay))
.Concat()
.Do(Console.WriteLine)
.LastOrDefaultAsync();
}
delay
难道不是一个可以观察到的绝妙技巧吗?之所以有效,是因为 OnCompleted 和 OnNext 一样被延迟了!
Isn't that delay
observable a nifty trick? It works because OnCompleted is delayed just like OnNext!
这篇关于RX:如何处理序列中的 n 个缓冲项,然后在处理下 n 个项之前等待 t 秒?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文