推在均匀的时间间隔缓冲的事件的一种方式 [英] A way to push buffered events in even intervals

查看:117
本文介绍了推在均匀的时间间隔缓冲的事件的一种方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图做到的,是缓冲一些的IObservable(他们在突发)进入的事件和进一步释放他们,但一个接一个,甚至在间隔。
这样的:

  -oo噢噢面向对象------------- -----呜-OO-O --------------> 

-o - ○ - ○ - ○ - ○ - ○ - ○--------○ - ○ - ○ - ○ - 邻-o - Ø---->



由于我很新的接收,我不知道如果已经有一个主题,或者只是做这个操作。 ?也许它可以通过组合来完成。



更新:



感谢
理查德·绍洛伊以指出在排水运营商,我发现了另一个<一个HREF =htt​​p://social.msdn.microsoft.com/Forums/en-US/rx/thread/749771d5-544b-4619-adff-6e79b2906152>例如詹姆斯万里排水运营商的使用。以下是我设法得到它在一个WPF应用程序工作:

  .Drain(X => {
过程(X);
返回Observable.Return(新单位())
.Delay(TimeSpan.FromSeconds(1),Scheduler.Dispatcher);
})订阅();



我有一些有趣的,因为省略调度参数使应用程序在调试模式下崩溃没有任何异常显示出来(我需要学习如何在接收处理异常)。
工艺方法直接​​修改UI的状态,但我想这是很简单的作出的IObservable出来的(使用ISubject?)。



更新:



在我一直在尝试与ISubject同时,下面的类做什么,我想要的 - 它可以让出及时缓冲TS

 公共类StepSubject< T> :ISubject< T> 
{
IObserver< T>订户;
问答LT; T>队列=新队列< T>();
MutableDisposable取消=新MutableDisposable();
时间跨度区间;
IScheduler调度;
布尔闲置= TRUE;

公共StepSubject(时间跨度区间,IScheduler调度)
{
this.interval =区间;
this.scheduler =调度;
}

无效步()
{
下一笔;
锁(队列)
{
闲置= queue.Count == 0;
如果
下一= queue.Dequeue()(空闲!);
}

如果
{
cancel.Disposable = scheduler.Schedule(步骤,间隔)(空闲!);
subscriber.OnNext(下);
}
}

公共无效OnNext(T值)
{
锁(队列)
queue.Enqueue(值);

如果(空闲)
cancel.Disposable = scheduler.Schedule(步骤);
}

公共IDisposable的订阅(IObserver< T>观察员)
{
用户=观测;
返回取消;
}
}

这幼稚的做法是从OnCompleted和的OnError剥去清晰也只有单一的订阅允许的。


解决方案

它实际上tricker比它的声音。



使用延迟不起作用,因为该值将散装还是发生了,只是稍微延迟了。



使用间隔与任 CombineLatest 邮编不工作中,因为前者会导致源值被跳过,而后者将缓冲时间间隔值。



我认为新的排水操作符(在1.0.2787.0 添加),结合延迟应该做的伎俩:

  source.Drain(X => Observable.Empty&下; INT方式>()延迟(TimeSpan.FromSeconds(1))StartWith(X))。 



排水操作符如的SelectMany ,但等到前面的输出调用带有下一个值选择前完成。 <击>现在还没有的究竟的你是什么之后(在一个块中的第一个值也将被推迟),但它很接近:的使用上面,现在符合您的大理石图

编辑:显然,排水在框架不喜欢<$ C $工作C>的SelectMany 。我会要求在官方论坛上一些建议。在此期间,这里的排水的实现,做什么你是后:



修改09/11:实施固定的错误和更新。使用以满足您的要求的大理石图

 公共静态类ObservableDrainExtensions 
{
公共静态的IObservable<吹捧> ;漏< TSource,吹捧>(此的IObservable< TSource>源,
Func键< TSource,的IObservable<吹捧>>选择器)
{
返回Observable.Defer(()=>
{
BehaviorSubject<单元>队列=新BehaviorSubject<单元>(新单位());

返回源
.ZIP(队列,(v,q) => 5)
.SelectMany(v =>选择器(ⅴ)
。做(_ => {},()=> queue.OnNext(新单元()))
);
});
}
}


What I'm trying to achieve is to buffer incoming events from some IObservable ( they come in bursts) and release them further, but one by one, in even intervals. Like this:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

Since I'm quite new to Rx, I'm not sure if there already is a Subject or an operator that does just this. Maybe It can be done by composition?

update:

Thanks to Richard Szalay for pointing out the Drain operator, I found another example by James Miles of Drain operator usage. Here's how I managed to get it to work in a WPF app:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

I had some fun, because omitting the scheduler parameter causes the app to crash in debug mode without any exception showing up ( I need to learn how to deal with exceptions in Rx). The Process method modifies the UI state directly, but I guess it's quite simple to make an IObservable out of it (using a ISubject?).

update:

In the meantime I've been experimenting with ISubject, the class below does what I wanted - it lets out buffered Ts in a timely manner:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}

This naive implementation is stripped from OnCompleted and OnError for clarity, also only single subscription allowed.

解决方案

It's actually tricker than it sounds.

Using Delay doesn't work because the values will still happen in bulk, only slightly delayed.

Using Interval with either CombineLatest or Zip doesn't work, since the former will cause source values to be skipped and the latter will buffer interval values.

I think the new Drain operator (added in 1.0.2787.0), combined with Delay should do the trick:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

The Drain operator works like SelectMany, but waits until the previous output completes before calling the selector with the next value. It's still not exactly what you are after (the first value in a block will also be delayed), but it's close: The usage above matches your marble diagram now.

Edit: Apparently the Drain in the framework doesn't work like SelectMany. I'll ask for some advice in the official forums. In the meantime, here's an implementation of Drain that does what you're after:

Edit 09/11: Fixed errors in implementation and updated usage to match your requested marble diagram.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

这篇关于推在均匀的时间间隔缓冲的事件的一种方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆