缓冲区在处理项目 [英] buffer while processing items

查看:145
本文介绍了缓冲区在处理项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有定期触发一个事件。让我们假设处理事件,大约需要1秒。与其等待1秒对每个接收到的事件我希望积累事件
,直到最后的处理完成。当处理完成我想要PROCES我最后的处理过程中接收到的事件数据:

  E1 E2 E3 E4 E5 E6 E7事件发生
--------------------------------------------- -------------------------------------------------- -------------------------------------------------- - >时间
1S 2S,3S 4S 5S 6S
P(E1)P(E2,E3)P(E4)P(E5,E6)P(E7)
[------- ----------------] [-----------------------] [------- ----------------] [-----------------------] [------- ----------------]项的处理

在上面的例子中,处理一旦发生E1开始。虽然处理需要的地方2个事件已经到达。它们应该被如此存储当p(E1) - 这意味着el的加工 -
为完成了加工的事件的E2和E3进行。

这PROCES类似于滚动构建:一个修改检查中,该buildserver开始建设,一旦构建完成已经
所有的变更在生成过程中检查了后会有处理。



我应该怎么做,使用接收?



我已经使用了缓冲带开口合并和关闭选择尝试,但我无法得到它的权利。任何实例或方向感激!



让我们假设一个主题作为输入流。



我已经试过的东西像这样的,但我完全失去了。

  VAR observer1 =输入
.Buffer(bc.Where(开放=> ;打开),_ => bc.Where(开放=>!打开))
.Subscribe(EV =>
{
bc.OnNext(真);
的String.Format(处理项目{0}的string.join(,,ev.Select(E =方式> e.ToString()))转储());
的Thread.Sleep (300);
bc.OnNext(假);
});


解决方案

这是不繁琐。幸运的是@DaveSexton已经做好了所有的辛勤工作。你想 BufferIntrospective 从RXX库。 。这里退房源



之所以这样是很难,因为 IObserver< T> 没有内置的手段信号背压 - 比OnXXX调用的阻塞的精妙之处等。可观察的需要,要注意观察,你需要引入并发来管理缓冲。



另外请注意,如果您有多个用户,他们将获得不同的数据因为他们所接收的取决于源事件发生率和消费率都。



另一种方法是只需添加所有事件以一个线程安全的队列在OnNext处理程序,并有一个独立的任务清空队列在循环中。 BufferIntrospective 可能是清洁,但。



有一个小游戏,并且这个玩具实施似乎工作。但是RXX将更加强劲,所以这只是教学真正显示什么东西排序参与。 ,关键是通过调度引进并发

 公共静态的IObservable< IList的< TSource>> BufferIntrospective< TSource>(
本的IObservable< TSource>源,
IScheduler调度= NULL)
{
调度=调度? Scheduler.Default;
返回Observable.Create< IList的< TSource>>(O => {
受试对象;单元>反馈=新的受试对象;单元>();
VAR sourcePub = source.Publish( ).RefCount();
无功分= sourcePub.Buffer(
()=>反馈).ObserveOn(调度).Subscribe(@event =>
{
o.OnNext(@event);
feedback.OnNext(Unit.Default);
},
o.OnError,
o.OnCompleted);
变种开始= sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
返回新CompositeDisposable(分,开始);
});
}

此示例代码显示了使用和两个不同节奏的用户如何获得不同的缓冲事件5一个接收批次,10批次其他



我使用的 LINQPad 导出来显示每个缓冲区轻松的内容。

 < 。code> VAR XS = Observable.Interval(TimeSpan.FromSeconds(0.2))以(30); 

VAR缓冲= xs.BufferIntrospective();

buffered.Subscribe(X => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(1))等待();
});

buffered.Subscribe(X => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(2))等待();
});


I have an event that fires regularly. Let's assume that processing the event takes ~1s. Instead of waiting 1s for each received event I want to accumulate events until the last processing is done. When processing is done I want to proces the event data I received during the last processing:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
                         1s                      2s                     3s                       4s                       5s                      6s
p(e1)                    p(e2, e3)                                      p(e4)                    p(e5, e6)                p(e7)
[-----------------------][-----------------------]                      [-----------------------][-----------------------][-----------------------]  processing of items                        

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 - 
is finished the processing of the events e2 and e3 takes place. 

This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been 
checked in during the build will then be processed.

How should I do that using Rx?

I have tried using the Buffer combined with an opening and closing selector but I can't get it right. Any examples or direction are appreciated!

Let's assume a Subject as the input stream.

I have tried something like this but I am totally lost.

var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
    bc.OnNext(true);
    String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
    Thread.Sleep(300);
    bc.OnNext(false);
});

解决方案

This is non-trival. Fortunately @DaveSexton has already done all the hard work. You want BufferIntrospective from the Rxx library. Check out the source here.

The reason why this is hard is because IObserver<T> doesn't have built-in means to signal back-pressure - other than the subtlety of the blocking of OnXXX invocations. The Observable needs to pay attention to the Observer, and you need to introduce concurrency to manage the buffering.

Also note that if you have multiple subscribers, they will get different data as what they receive depends on both the source event rate and their consumption rate.

Another approach is to just add all the events to a thread-safe queue in your OnNext handler, and have a separate task that empties the queue in a loop. BufferIntrospective is probably cleaner though.

Had a little play, and this toy implementation seems to work. But Rxx will be more robust, so this is just pedagogical really to show what sort of thing is involved. The key is the introduction of concurrency via the scheduler.

public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return Observable.Create<IList<TSource>>(o => {
        Subject<Unit> feedback = new Subject<Unit>();
        var sourcePub = source.Publish().RefCount();
        var sub = sourcePub.Buffer(
            () => feedback).ObserveOn(scheduler).Subscribe(@event =>
            {                
                o.OnNext(@event);
                feedback.OnNext(Unit.Default);
            },
            o.OnError,
            o.OnCompleted);
        var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
        return new CompositeDisposable(sub, start);
    });        
}

This sample code shows the usage and how two differently paced subscribers get different buffering of events, one receiving batches of 5, the other batches of 10.

I am using LINQPad's Dump to show the contents of each buffer easily.

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);

var buffered = xs.BufferIntrospective();

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});

这篇关于缓冲区在处理项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆