接收:我怎样才能立刻响应,油门后续请求 [英] Rx: How can I respond immediately, and throttle subsequent requests

查看:118
本文介绍了接收:我怎样才能立刻响应,油门后续请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想建立一个接收订阅,可以对事件作出响应向右走,然后忽略指定的冷却期内发生的后续事件。

的开箱即用油门/缓冲方法只有一次的超时时间响应,这是不太我所需要的。

下面是一些code,设置了场景,并使用油门(这不是我想要的解决方案):

 类节目
{
    静态秒表SW =新的秒表();    静态无效的主要(字串[] args)
    {
        无功受=新的受试对象;诠释>();
        VAR超时= TimeSpan.FromMilliseconds(500);        学科
            .Throttle(超时)
            .Subscribe(DoStuff);        VAR厂=新TaskFactory();        sw.Start();        factory.StartNew(()=>
        {
            Console.WriteLine(批1(无延迟));
            subject.OnNext(1);
        });        factory.StartNewDelayed(1000,()=>
        {
            Console.WriteLine(批2(1秒延迟));
            subject.OnNext(2);
        });        factory.StartNewDelayed(1300,()=>
        {
            Console.WriteLine(批3(1.3s延迟));
            subject.OnNext(3);
        });        factory.StartNewDelayed(1600,()=>
        {
            Console.WriteLine(批4(1.6秒延迟));
            subject.OnNext(4);
        });        Console.ReadKey();
        sw.Stop();
    }    私有静态无效DoStuff(int i)以
    {
        Console.WriteLine(处理{0} {1}毫秒,我,sw.ElapsedMilliseconds);
    }
}

现在运行这个权利的输出是:


  

1批(无延时)


  
  

在508ms处理1


  
  

2批(1秒延时)


  
  

3批(1.3s延迟)


  
  

第4批(1.6秒延时)


  
  

在2114ms处理4


需要注意的是第2批不处理(这是很好的!),因为我们等待500ms的请求之间间隔由于油门的性质。第3批也没有处理,(这是少没关系,因为它发生500ms以上,从第2批),由于它接近4批。

我正在寻找的是更多的东西是这样的:


  

1批(无延时)


  
  

在〜0毫秒处理1


  
  

2批(1秒延时)


  
  

在1000〜处理2


  
  

3批(1.3s延迟)


  
  

第4批(1.6秒延时)


  
  

在1600年代〜4处理


请注意那批3将不会在这种情况下进行处理(这是很好的!),因为它在第2批500毫秒发生。

修改

下面是我用StartNewDelayed扩展方法的实现:

  ///<总结>创建一个任务在指定延迟后,将完成< /总结>
///< PARAM NAME =工厂>该TaskFactory< /参数>
///< PARAM NAME =millisecondsDelay方式>在此之后,任务应该过渡到RanToCompletion延迟< /参数>
///<退货和GT;将在指定的时间后完成的任务和LT; /回报>
公共静态任务StartNewDelayed(
    这TaskFactory工厂,诠释millisecondsDelay)
{
    返回StartNewDelayed(工厂,millisecondsDelay,CancellationToken.None);
}///<总结>创建一个任务在指定延迟后,将完成< /总结>
///< PARAM NAME =工厂>该TaskFactory< /参数>
///< PARAM NAME =millisecondsDelay方式>在此之后,任务应该过渡到RanToCompletion延迟< /参数>
///&所述; PARAM NAME =的CancellationToken方式>可以被用于取消定时任务取消令牌下; /参数>
///<收益方式>将在指定的时间后完成,这就是取消与指定令牌的任务< /回报>
公共静态任务StartNewDelayed(这TaskFactory工厂,诠释millisecondsDelay,的CancellationToken的CancellationToken)
{
    //验证参数
    如果(出厂== NULL)抛出新的ArgumentNullException(工厂);
    如果(millisecondsDelay℃下)抛出新ArgumentOutOfRangeException(millisecondsDelay);    //创建定时任务
    VAR TCS =新TaskCompletionSource<对象>(factory.CreationOptions);
    VAR CTR =默认(CancellationTokenRegistration);    //创建计时器但不启动它。如果我们现在开始吧,
    //前CTR已被设置为正确的注册就可能被激活。
    VAR定时器=新定时器(自拍=>
    {
        //清理都取消标记,定时器,并尝试过渡到完成
        ctr.Dispose();
        ((定时器)个体经营).Dispose();
        tcs.TrySetResult(NULL);
    });    //使用取消标记注册。
    如果(cancellationToken.CanBeCanceled)
    {
        //当取消发生,取消计时器并尝试过渡到取消。
        //有可能是一场比赛,但它是良性的。
        CTR = cancellationToken.Register(()=>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }    如果(millisecondsDelay大于0)
    {
        //启动计时器,并交还任务...
        timer.Change(millisecondsDelay,Timeout.Infinite);
    }
    其他
    {
        //只是完成任务,并保持执行当前线程上。
        ctr.Dispose();
        tcs.TrySetResult(NULL);
        timer.Dispose();
    }    返回tcs.Task;
}


解决方案

我张贴的最初的回答也有缺陷,即在窗口方法,与使用时 Observable.Interval 来表示窗口的结束,设置了一个无穷级数的500ms的窗口。我真正需要的是,当第一个结果被泵入主题启动一个窗口,500毫秒后结束。

由于数据坏了能装进那个已经将要创建的窗口我的样本数据掩盖了这个问题。 (即0-500ms,501-1000ms,1001-1500ms等)

而不是考虑这个时机:

  factory.StartNewDelayed(300,()=>
{
    Console.WriteLine(批1(300毫秒延迟));
    subject.OnNext(1);
});factory.StartNewDelayed(700,()=>
{
    Console.WriteLine(批2(700毫秒延迟));
    subject.OnNext(2);
});factory.StartNewDelayed(1300,()=>
{
    Console.WriteLine(批3(1.3s延迟));
    subject.OnNext(3);
});factory.StartNewDelayed(1600,()=>
{
    Console.WriteLine(批4(1.6秒延迟));
    subject.OnNext(4);
});

我得到的是:


  

1批(300ms的延迟)


  
  

在356ms处理1


  
  

2批(700毫秒延迟)


  
  

在750ms之间处理2


  
  

3批(1.3s延迟)


  
  

在1346ms处理3


  
  

第4批(1.6秒延时)


  
  

在1644ms处理4


这是因为窗户0毫秒,500毫秒,1000毫秒,以及1500毫秒开始,所以每个 Subject.OnNext 能装进自己的窗口。

我要的是:


  

1批(300ms的延迟)


  
  

在300ms以内〜1处理


  
  

2批(700毫秒延迟)


  
  

3批(1.3s延迟)


  
  

在〜1300ms处理3


  
  

第4批(1.6秒延时)


很多挣扎并在其上一个小时敲打了同事后,我们到达了一个更好的解决方案采用纯Rx和一个局部变量:

 布尔isCoolingDown = FALSE;学科
    。凡(_ =>!isCoolingDown)
    。订阅(
    I =>
    {
        DoStuff(ⅰ);        isCoolingDown = TRUE;        可观察
            .Interval(cooldownInterval)
            。取(1)
            .Subscribe(_ => isCoolingDown = FALSE);
    });

我们的假设是调用认购方式是同步的。如果他们没有,那么可以引入一个简单的锁。

I would like to set up an Rx subscription that can respond to an event right away, and then ignore subsequent events that happen within a specified "cooldown" period.

The out of the box Throttle/Buffer methods respond only once the timeout has elapsed, which is not quite what I need.

Here is some code that sets up the scenario, and uses a Throttle (which isn't the solution I want):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();

        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });

        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

The output of running this right now is:

Batch 1 (no delay)

Handling 1 at 508ms

Batch 2 (1s delay)

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at 2114ms

Note that batch 2 isn't handled (which is fine!) because we wait for 500ms to elapse between requests due to the nature of throttle. Batch 3 is also not handled, (which is less alright because it happened more than 500ms from batch 2) due to its proximity to Batch 4.

What I'm looking for is something more like this:

Batch 1 (no delay)

Handling 1 at ~0ms

Batch 2 (1s delay)

Handling 2 at ~1000s

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at ~1600s

Note that batch 3 wouldn't be handled in this scenario (which is fine!) because it occurs within 500ms of Batch 2.

EDIT:

Here is the implementation for the "StartNewDelayed" extension method that I use:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}

解决方案

The initial answer I posted has a flaw: namely that the Window method, when used with an Observable.Interval to denote the end of the window, sets up an infinite series of 500ms windows. What I really need is a window that starts when the first result is pumped into the subject, and ends after the 500ms.

My sample data masked this problem because the data broke down nicely into the windows that were already going to be created. (i.e. 0-500ms, 501-1000ms, 1001-1500ms, etc.)

Consider instead this timing:

factory.StartNewDelayed(300,() =>
{
    Console.WriteLine("Batch 1 (300ms delay)");
    subject.OnNext(1);
});

factory.StartNewDelayed(700, () =>
{
    Console.WriteLine("Batch 2 (700ms delay)");
    subject.OnNext(2);
});

factory.StartNewDelayed(1300, () =>
{
    Console.WriteLine("Batch 3 (1.3s delay)");
    subject.OnNext(3);
});

factory.StartNewDelayed(1600, () =>
{
    Console.WriteLine("Batch 4 (1.6s delay)");
    subject.OnNext(4);
});

What I get is:

Batch 1 (300ms delay)

Handling 1 at 356ms

Batch 2 (700ms delay)

Handling 2 at 750ms

Batch 3 (1.3s delay)

Handling 3 at 1346ms

Batch 4 (1.6s delay)

Handling 4 at 1644ms

This is because the windows begin at 0ms, 500ms, 1000ms, and 1500ms and so each Subject.OnNext fits nicely into its own window.

What I want is:

Batch 1 (300ms delay)

Handling 1 at ~300ms

Batch 2 (700ms delay)

Batch 3 (1.3s delay)

Handling 3 at ~1300ms

Batch 4 (1.6s delay)

After a lot of struggling and an hour banging on it with a co-worker, we arrived at a better solution using pure Rx and a single local variable:

bool isCoolingDown = false;

subject
    .Where(_ => !isCoolingDown)
    .Subscribe(
    i =>
    {
        DoStuff(i);

        isCoolingDown = true;

        Observable
            .Interval(cooldownInterval)
            .Take(1)
            .Subscribe(_ => isCoolingDown = false);
    });

Our assumption is that calls to the subscription method are synchronized. If they are not, then a simple lock could be introduced.

这篇关于接收:我怎样才能立刻响应,油门后续请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆