随着RX,我怎么忽略所有 - 除了最最新值时,我的订阅方式运行 [英] With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running

查看:100
本文介绍了随着RX,我怎么忽略所有 - 除了最最新值时,我的订阅方式运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用无扩展的,我想忽略从我的事件陆续发生,而消息我的订阅方法正在运行。即有时需要更长的我来处理比消息之间的时间的消息,所以我要删除的邮件,我没有时间来处理。

Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my Subscribe method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.

不过,我的订阅方法完成时,如果任何消息真的来过我要处理的最后一个。所以我总是处理最新的消息。

However, when my Subscribe method completes, if any messages did come through I want to process the last one. So I always process the most recent message.

所以,如果我有一些code这确实

So, if I have some code which does:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

和如果我们假设该'100'需要较长的时间来处理。然后我想在'100'完成时要处理的2。 1应该被忽略,因为它被取代了'2',而'100'是仍在处理中。

and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.

下面是我想用一个后台任务和结果的一个例子最新()

Here's an example of the result I want using a background task and Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

不过,最新的()是一个阻塞调用,我会preFER不要有一个线程等待坐着像这样下一个值(有有时会信息之间很长的差距)。

However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).

我也能得到我想要通过使用 BroadcastBlock 来自的 TPL数据流,的是这样的:

I can also get the result I want by using a BroadcastBlock from TPL Dataflow, like this:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

但这种感觉像它应该直接在接收成为可能。什么是去这样做的最佳方式?

but this feels like it should be possible directly in Rx. What's the best way to go about doing it?

推荐答案

感谢李·坎贝尔(介绍到接收名声),我现在使用这个扩展方法有一个工作的解决方案:

Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

这篇关于随着RX,我怎么忽略所有 - 除了最最新值时,我的订阅方式运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆