提供更新和新值的缓存为"DistinctLatest".订阅时获取完整的缓存内容 [英] A cache serving updates and new values as "DistinctLatest" and full cache contents upon subscription

查看:43
本文介绍了提供更新和新值的缓存为"DistinctLatest".订阅时获取完整的缓存内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试使用 ReplaySubject 来实现缓存,如下所示,但是我无法使用Rx解决这种情况.请参阅代码和随附的测试.问题在于缓存会丢弃最新的条目并保留最旧的条目.

I'm trying to implement a cache using a ReplaySubject like follows, but I'm unable to solve the situation using Rx. See code and accompanying tests. The trouble is that the cache drops the newest entries and preserves the oldest.

public static class RxExtensions
{
    /// <summary>
    /// A cache that keeps distinct elements where the elements are replaced by the latest. Upon subscription the subscriber should receive the full cache contents.
    /// </summary>
    /// <typeparam name="T">The type of the result</typeparam>
    /// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
    /// <param name="newElements">The sequence of new elements.</param>
    /// <param name="seedElements">The elements when the cache is started.</param>
    /// <param name="replacementSelector">The replacement to select distinct elements in the cache.</param>
    /// <returns>The cache contents upon first call and changes thereafter.</returns>
    public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
    {
        var replaySubject = new ReplaySubject<T>();
        seedElements.ToObservable().Concat(newElements).Subscribe(replaySubject);

        return replaySubject.Distinct(replacementSelector);
    }
}

如果我编写类似的函数,看起来旧的种子值将被删除

It looks like the old ones, the seed values, would be dropped if I write the function like

newElements.Subscribe(replaySubject);
return replaySubject.Concat(seedElements.ToObservable()).Distinct(replacementSelector);

但是由于我认为 .Concat 的工作原理,工作"很可能只是因为当前的测试方式,请参见下一步.

but due to how I think .Concat works, the "works" is likely just because how the test currently are, see next.

public void CacheTests()
{
    var seedElements = new List<Event>(new[]
    {
        new Event { Id = 0, Batch = 1 },
        new Event { Id = 1, Batch = 1 },
        new Event { Id = 2, Batch = 1 }
    });

    var testScheduler = new TestScheduler();
    var observer = testScheduler.CreateObserver<Event>();
    var batchTicks = TimeSpan.FromSeconds(10);
    var xs = testScheduler.CreateHotObservable
    (
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 0, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 1, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 2, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 3, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 4, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks + 10, new Event { Id = 0, Batch = 3 }),
        ReactiveTest.OnNext(batchTicks.Ticks + 10, new Event { Id = 1, Batch = 3 })
    );

    var subs = xs.Cache(seedElements, i => i.Id).Subscribe(observer);
    var seedElementsAndNoMore = observer.Messages.ToArray();
    Assert.IsTrue(observer.Messages.Count == 3);

    testScheduler.Start();
    var seedAndReplacedElements = observer.Messages.ToArray();

    //OK, a bad assert, we should create expected timings and want to check
    //also the actual batch numbers, but to get things going...
    //There should be Events with IDs { 1, 2, 3, 4, 5 } all having a batch number
    //of either 2 or 3. Also, a total of 7 (not 10) events
    //should've been observed.
    Assert.IsTrue(observer.Messages.Count == 7);
    for(int i = 0; i < seedAndReplacedElements.Length; ++i)
    {                
        Assert.IsTrue(seedAndReplacedElements[i].Value.Value.Batch > 1)             
    }
}

我想我想要的是

public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
    var replaySubject = new ReplaySubject<T>();
    newElements.StartWith(seedElements).Distinct(replacementSelector).Subscribe(replaySubject);

    return replaySubject;           
}

但是麻烦的是种子值先存在,然后Rx丢弃较新的值,而不是种子值.然后进行另一种处理(可能使用 .Merge )可能会造成一种情况,即在接收到新值之后将种子引入到可观察对象中,从而造成实际不替换种子值的情况

but the trouble is that the seed values are there first and then Rx drops the newer values, not the seed values. Then doing the other way around (maybe using .Merge) could create a situation the seed is introduced to the observable after new values have been received, thus creating a situation where the seed values aren't actually replaced.

推荐答案

好的,我想我有想要的东西.我主要根据以下短语确定您的要求:

Ok, I think I have what you want. I determined your requirements mostly from the phrase:

订阅者订阅此缓存时,它首先获取缓存中保存的所有值,然后在它们进入时进行更新

When the subscriber subscribes to this cache, it gets all the values held in the cache as the first thing and then after that updates as they come in

我认为这是希望在单个订阅之外具有生存期(即应开始订阅,订阅者可以按自己的喜好来去去),并因此使其成为IConnectableObservable(这在您的代码中是隐式的,但没有范围)正确).

I believe this is desired to have a lifetime outside of a single subscription (i.e. it should be started and subscribers can come and go as they please) and have therefore made it an IConnectableObservable (this is implicit in your code but not scoped correctly).

我还重构了您的测试以显示多个订阅者(根据@Shlomo的评论),如下所示:

I have also refactored your test to show multiple subscribers (per @Shlomo's comment) as follows:

[Fact]
public void ReplayAllElements()
{
    var seedElements = new List<Event>(new[]
    {
        new Event { Id = 0, Batch = 1 },
        new Event { Id = 1, Batch = 1 },
        new Event { Id = 2, Batch = 1 }
    });

    var testScheduler = new TestScheduler();

    var xs = testScheduler.CreateHotObservable
    (
        ReactiveTest.OnNext(1, new Event { Id = 0, Batch = 2 }),
        ReactiveTest.OnNext(2, new Event { Id = 1, Batch = 2 }),
        ReactiveTest.OnNext(3, new Event { Id = 2, Batch = 2 }),
        ReactiveTest.OnNext(4, new Event { Id = 3, Batch = 2 }),
        ReactiveTest.OnNext(5, new Event { Id = 4, Batch = 2 }),    
        ReactiveTest.OnNext(6, new Event { Id = 0, Batch = 3 }),
        ReactiveTest.OnNext(7, new Event { Id = 1, Batch = 3 })
    );

    IConnectableObservable<Event> cached = xs.Cache(seedElements, i => i.Id);

    var observerA = testScheduler.CreateObserver<Event>();
    cached.Subscribe(observerA);
    cached.Connect();

    testScheduler.AdvanceTo(4);

    var observerB = testScheduler.CreateObserver<Event>();
    cached.Subscribe(observerB);

    testScheduler.AdvanceTo(7);

    var expectedA = new[]
    {
        ReactiveTest.OnNext<Event>(0, @event => @event.Id == 0 && @event.Batch == 1 ),
        ReactiveTest.OnNext<Event>(0, @event => @event.Id == 1 && @event.Batch == 1 ),
        ReactiveTest.OnNext<Event>(0, @event => @event.Id == 2 && @event.Batch == 1 ),
        ReactiveTest.OnNext<Event>(1, @event => @event.Id == 0 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(2, @event => @event.Id == 1 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(3, @event => @event.Id == 2 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(4, @event => @event.Id == 3 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 4 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(6, @event => @event.Id == 0 && @event.Batch == 3 ),
        ReactiveTest.OnNext<Event>(7, @event => @event.Id == 1 && @event.Batch == 3 )
    };

    observerA.Messages.AssertEqual(expectedA);

    var expectedB = new[]
    {
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 0 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 1 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 2 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 3 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 4 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(6, @event => @event.Id == 0 && @event.Batch == 3 ),
        ReactiveTest.OnNext<Event>(7, @event => @event.Id == 1 && @event.Batch == 3 )
    };

    observerB.Messages.AssertEqual(expectedB);
}

如您所见,observerA获取所有种子值和更新,而observerB仅获取每个键的最新值,然后进行进一步更新.

As you can see, observerA gets all the seed values and the updates whereas observerB gets only the latest value for each key and then further updates.

执行此操作的代码如下:

The code to do this is as follows:

public static class RxExtensions
{
    /// <summary>
    /// A cache that keeps distinct elements where the elements are replaced by the latest.
    /// </summary>
    /// <typeparam name="T">The type of the result</typeparam>
    /// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
    /// <param name="newElements">The sequence of new elements.</param>
    /// <param name="seedElements">The elements when the cache is started.</param>
    /// <param name="keySelector">The replacement to select distinct elements in the cache.</param>
    /// <returns>The cache contents upon first call and changes thereafter.</returns>
    public static IConnectableObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> keySelector)
    {
        return new Cache<TKey, T>(newElements, seedElements, keySelector);
    }
}

public class Cache<TKey, T> : IConnectableObservable<T>
{
    private class State
    {
        public ImmutableDictionary<TKey, T> Cache { get; set; }
        public T Value { get; set; }
    }

    private readonly IConnectableObservable<State> _source;
    private readonly IObservable<T> _observable;

    public Cache(IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> keySelector)
    {
        var agg = new State { Cache = seedElements.ToImmutableDictionary(keySelector), Value = default(T) };

        _source = newElements
            // Use the Scan operator to update the dictionary of values based on key and use the anonymous tuple to pass this and the current item to the next operator
            .Scan(agg, (tuple, item) => new State { Cache = tuple.Cache.SetItem(keySelector(item), item), Value = item })
            // Ensure we always have at least one item
            .StartWith(agg)
            // Share this single subscription to the above with all subscribers
            .Publish();

        _observable = _source.Publish(source =>
                // ... concatting ...
                Observable.Concat(
                    // ... getting a single collection of values from the cache and flattening it to a series of values ...
                    source.Select(tuple => tuple.Cache.Values).Take(1).SelectMany(values => values),
                    // ... and the returning the values as they're emitted from the source
                    source.Select(tuple => tuple.Value)
                )
            );
    }

    public IDisposable Connect()
    {
        return _source.Connect();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _observable.Subscribe(observer);
    }
}

肯定是一个有趣的问题.答案的关键是此发布超载:

Was certainly an interesting question. The key to the answer was this Publish overload:

    // Summary:
    //     Returns an observable sequence that is the result of invoking the selector on
    //     a connectable observable sequence that shares a single subscription to the underlying
    //     sequence. This operator is a specialization of Multicast using a regular System.Reactive.Subjects.Subject`1.
    //
    // Parameters:
    //   source:
    //     Source sequence whose elements will be multicasted through a single shared subscription.
    //
    //   selector:
    //     Selector function which can use the multicasted source sequence as many times
    //     as needed, without causing multiple subscriptions to the source sequence. Subscribers
    //     to the given source will receive all notifications of the source from the time
    //     of the subscription on.
    //
    // Type parameters:
    //   TSource:
    //     The type of the elements in the source sequence.
    //
    //   TResult:
    //     The type of the elements in the result sequence.
    //
    // Returns:
    //     An observable sequence that contains the elements of a sequence produced by multicasting
    //     the source sequence within a selector function.
    //
    // Exceptions:
    //   T:System.ArgumentNullException:
    //     source or selector is null.
    public static IObservable<TResult> Publish<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);

无论如何,希望对您有所帮助.

Anyway, hope it helps.

这篇关于提供更新和新值的缓存为"DistinctLatest".订阅时获取完整的缓存内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆