限制事件和锁定方法 [英] Throttling Events and Locking Methods

查看:27
本文介绍了限制事件和锁定方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们假设我有这样的事情:

Let's pretend I have something like this:

<TextBox Text="{Binding Text, Mode=TwoWay}" />

像这样:

public class MyViewModel : INotifyPropertyChanged
{
    public MyViewModel()
    {
        // run DoWork() when this.Text changes
        Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
            .Where(x => x.EventArgs.PropertyName.Equals("Text"))
            .Subscribe(async x => await DoWork());
    }

    private async Task DoWork()
    {
        await Task.Delay(this.Text.Length * 100);
    }

    public event PropertyChangedEventHandler PropertyChanged;

    private string _Text = "Hello World";
    public string Text
    {
        get { return _Text; }
        set
        {
            _Text = value;
            if (this.PropertyChanged != null)
                this.PropertyChanged(this, new PropertyChangedEventArgs("Text"));
        }
    }
}

在这种情况下,用户可能会非常快速地打字.我需要:

In this scenario, the user could be typing very quickly. I need:

  1. DoWork() 不能在 DoWork() 已经运行时运行

  1. DoWork() must not run while DoWork() is already running

用户可能会输入突发、一些变化、暂停、一些变化

The user may type in spurts, some changes, pause, some changes

DoWork() 不需要每次更改,只需要最后一次更改

DoWork() is NOT required for every change, only the last change

DoWork() 的调用频率不需要超过 1 秒

There is no need for DoWork() to be called more frequently than 1 second

DoWork() 不能等到最后一次更改,如果突增 > 1 秒

DoWork() cannot wait until the last change, if the spurt is > 1 second

DoWork() 不应在系统空闲时调用

DoWork() should not be called while the system is idle

DoWork() 的持续时间因 this.Text 的长度而异

The duration of DoWork() varies based on the length of this.Text

问题不在于 Rx 是否可以做到这一点.我知道可以.正确的语法是什么?

The question isn't if Rx can do this. I know it can. What's the proper syntax?

推荐答案

作为纯 RX 解决方案,您可能会感到惊讶.它与提交限制搜索以响应文本框更改的类似(以及典型的 Rx 101 示例)略有不同 - 在这种情况下,可以触发并发搜索,取消除最新搜索之外的所有搜索.

You may be surprised how hard this is as a pure RX solution. It's subtly different to the similar (and typical Rx 101 example) of submitting a throttled search in response to textbox changes - in that case, it's ok to fire off concurrent searches, cancelling all but the latest one.

在这种情况下,一旦 DoWork() 关闭并运行,它就不能被替换或中断.

In this case, once DoWork() is off and running it can't be replaced or interrupted.

问题是 Rx 流向一个方向流动并且不能向后交谈" - 因此事件排队等待缓慢的消费者.在 Rx 中,由于消费者缓慢而丢弃事件非常困难.

The problem is that Rx streams flow in one direction and can't "talk backwards" - so events queue up against slow consumers. To drop events due to slow consumers is quite hard in Rx.

在一个新的(可能受到限制的)事件到来时可以取消和替换 DoWork() 的世界里,这要容易得多.

It's much easier in a world where DoWork() can be cancelled and replaced when a new (probably throttled) event arrives.

首先,我提出了一个纯 Rx 解决方案.最后,一个更简单的方法是通过 Rx 之外的调度机制来处理慢速消费者.

First I present a pure Rx solution. Then at the end, a simpler approach where the slow consumer is dealt with by a dispatching mechanism outside of Rx.

对于纯方法,您将需要此辅助扩展方法来删除针对缓慢消费者排队的事件 你可以在这里阅读:

For the pure approach, you'll need this helper extension method to drop events queued against a slow consumer which you can read about here:

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;

        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool wasNotAlreadyActive;
            lock (gate)
            {
                wasNotAlreadyActive = !active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (wasNotAlreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

有了这个,您可以执行以下操作:

With this available you can then do something like:

// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
          .Where(x => x.EventArgs.PropertyName.Equals("Text"))
          .Sample(TimeSpan.FromSeconds(1)) // get the latest event in each second
          .ObservableLatestOn(Scheduler.Default) // drop all but the latest event
          .Subscribe(x => DoWork().Wait()); // block to avoid overlap

备注

老实说,您最好避免在这里使用纯 Rx 解决方案,而不要直接从订阅者那里调用 DoWork().我会用一个从 Subscribe 方法调用的中间调度机制来包装它,如果它已经在运行,它会处理不调用它 - 代码维护起来会更简单.

Remarks

To be honest, you are probably better off avoiding the pure Rx solution here, and instead DON'T call DoWork() directly from a subscriber. I would wrap it with an intermediate dispatching mechanism called from the Subscribe method that handles not calling it if it's already running - the code would be way simpler to maintain.

在思考了几天之后,我没有比这里的其他一些答案做得更好 - 我会留下上面的兴趣,但我认为我最喜欢 Filip Skakun 方法.

After thinking on this for a few days, I didn't do any better than some of the other answers here - I'll leave the above for interest, but I think I like Filip Skakun approach the best.

这篇关于限制事件和锁定方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆