应该如何在 Rx 中实现 DistinctLatest(和缓存)运算符? [英] How should one go about implementing a DistinctLatest (and caching) operator in Rx?

查看:40
本文介绍了应该如何在 Rx 中实现 DistinctLatest(和缓存)运算符?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个问题 在订阅时提供更新和新值作为DistinctLatest"和完整缓存内容的缓存,社区处理得很好.提出了一个问题,即缓存和替换上述问题中定义的值的实际目标可以用 .DistinctLatest 运算符定义.

I had a question A cache serving updates and new values as "DistinctLatest" and full cache contents upon subscription, which was well handled by the community. A question was raised that the actual goal of caching and replacing values like defined in the aforementioned question could be defined with a .DistinctLatest operator.

好的!似乎没有太多谈论这样的运营商.在搜索和思考时,我发现 ReactiveX:Group 和 Buffer 只是每组中的最后一项,这有点接近.为了模仿原始问题,我尝试将缓存运算符编写为

OK! There doesn't seem to be much talk about such an operator. While searching, and thinking about it, I found ReactiveX: Group and Buffer only last item in each group, which is kind of close. To mimick the original issue, I tried to write the caching operator as

/// <summary>
/// A cache that keeps distinct elements where the elements are replaced by the latest.
/// </summary>
/// <typeparam name="T">The type of the result</typeparam>
/// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
/// <param name="newElements">The sequence of new elements.</param>
/// <param name="seedElements">The seed elements when the cache is started.</param>
/// <param name="replacementSelector">The replacement selector to choose distinct elements in the cache.</param>
/// <returns>The cache contents upon first call and changes thereafter.</returns>
public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
    var s = newElements.StartWith(seedElements).GroupBy(replacementSelector).Select(groupObservable =>
    {
        var replaySubject = new ReplaySubject<T>(1);
        groupObservable.Subscribe(value => replaySubject.OnNext(value));

        return replaySubject;
    });

    return s.SelectMany(i => i);            
 }

但是进行测试似乎也不起作用.看起来如果一开始就订阅了初始值和更新(和新值).而如果最后订阅了,则只记录替换的种子值.

But doing testing that doesn't seem to do the trick either. It looks like if one subscribe in the beginning the initial values and the updates (and new values) are observed. While if one subscribed at the end, only the replaced seed values are recorded.

现在,我想知道一个通​​用的 DistinctLast 运算符,我认为这是,但它不起作用,然后这个缓存"添加的是种子值和组的扁平化,但是这不是测试所说的.我也尝试了一些分组和 .TakeLast() 的东西,但没有骰子.

Now, I wonder about a general DistinctLast operator, which I think this, but it doesn't work, and then what this "cache" adds is seed values and flattening of the groups, but that's not what the test tells. I also tried some things with grouping and .TakeLast() too, but no dice.

如果有人对此提出建议或思考,我会很高兴,希望这会成为普遍有益的事情.

I'd be delighted if anyone had pointers or pondering about this and hopefully this turns out to be something commonly beneficial.

推荐答案

@LeeCampbell 为此做了大部分工作.请参阅其他参考问题.无论如何,这是代码:

@LeeCampbell did most of the work for this. See other referenced question. Anyway, here's the code:

public static class RxExtensions
{
    public static IObservable<T> DistinctLatest<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
    {
        return seedElements.ToObservable()
            .Concat(newElements)
            .GroupBy(i => replacementSelector)
            .SelectMany(grp => grp.Replay(1).Publish().RefCoun‌​t());
    }
}

这篇关于应该如何在 Rx 中实现 DistinctLatest(和缓存)运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆