Rx –与超时时间不同吗? [英] Rx – Distinct with timeout?

查看:71
本文介绍了Rx –与超时时间不同吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否可以通过某种方式在.NET的Reactive Extensions中实现Distinct,以便在给定时间内可以正常工作,并且在此时间之后,它应该重置并允许再次返回的值.我需要将其用作应用程序中的热门资源,该资源将在全年运行,并且现在停止运行,因此我担心性能,并且我需要一段时间后才允许使用这些值.也有DistinctUntilChanged,但在我的情况下,值可以混合使用-例如:A A X A,DistinctUntilChanged将给我A X A,我需要结果A X,在给定时间后应重新设置distance.

I’m wondering is there any way to implement Distinct in Reactive Extensions for .NET in such way that it will be working for given time and after this time it should reset and allow values that are come back again. I need this for hot source in application that will be working for whole year with now stops so I’m worried about performance and I need those values to be allowed after some time. There is also DistinctUntilChanged but in my case values could be mixed – for example: A A X A, DistinctUntilChanged will give me A X A and I need result A X and after given time distinct should be reset.

推荐答案

具有包装类,该类为项目加上时间戳,但不考虑时间戳(创建的字段)用于哈希或相等性:

With a wrapper class that timestamps items, but does not consider the timestamp (created field) for hashing or equality:

public class DistinctForItem<T> : IEquatable<DistinctForItem<T>>
{
    private readonly T item;
    private DateTime created;

    public DistinctForItem(T item)
    {
        this.item = item;
        this.created = DateTime.UtcNow;
    }

    public T Item
    {
        get { return item; }
    }

    public DateTime Created
    {
        get { return created; }
    }

    public bool Equals(DistinctForItem<T> other)
    {
        if (ReferenceEquals(null, other)) return false;
        if (ReferenceEquals(this, other)) return true;
        return EqualityComparer<T>.Default.Equals(Item, other.Item);
    }

    public override bool Equals(object obj)
    {
        if (ReferenceEquals(null, obj)) return false;
        if (ReferenceEquals(this, obj)) return true;
        if (obj.GetType() != this.GetType()) return false;
        return Equals((DistinctForItem<T>)obj);
    }

    public override int GetHashCode()
    {
        return EqualityComparer<T>.Default.GetHashCode(Item);
    }

    public static bool operator ==(DistinctForItem<T> left, DistinctForItem<T> right)
    {
        return Equals(left, right);
    }

    public static bool operator !=(DistinctForItem<T> left, DistinctForItem<T> right)
    {
        return !Equals(left, right);
    }
}

现在可以编写 DistinctFor 扩展方法:

It is now possible to write a DistinctFor extension method:

public static IObservable<T> DistinctFor<T>(this IObservable<T> src, 
                                            TimeSpan validityPeriod)
{
    //if HashSet<DistinctForItem<T>> actually allowed us the get at the 
    //items it contains it would be a better choice. 
    //However it doesn't, so we resort to 
    //Dictionary<DistinctForItem<T>, DistinctForItem<T>> instead.

    var hs = new Dictionary<DistinctForItem<T>, DistinctForItem<T>>();
    return src.Select(item => new DistinctForItem<T>(item)).Where(df =>
    {
        DistinctForItem<T> hsVal;
        if (hs.TryGetValue(df, out hsVal))
        {
            var age = DateTime.UtcNow - hsVal.Created;
            if (age < validityPeriod)
            {
                return false;
            }
        }
        hs[df] = df;
        return true;

    }).Select(df => df.Item);
}

可以使用:

Enumerable.Range(0, 1000)
    .Select(i => i % 3)
    .ToObservable()
    .Pace(TimeSpan.FromMilliseconds(500)) //drip feeds the observable
    .DistinctFor(TimeSpan.FromSeconds(5))
    .Subscribe(x => Console.WriteLine(x));

作为参考,这是我对 Pace< T> 的实现:

For reference, here is my implementation of Pace<T>:

public static IObservable<T> Pace<T>(this IObservable<T> src, TimeSpan delay)
{
    var timer = Observable
        .Timer(
            TimeSpan.FromSeconds(0),
            delay
        );

    return src.Zip(timer, (s, t) => s);
}

这篇关于Rx –与超时时间不同吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆