如何在rx.net中实现我自己的运算符 [英] How to implement my own operator in rx.net

查看:95
本文介绍了如何在rx.net中实现我自己的运算符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要RX中具有迟滞滤波器的功能.仅当先前发出的值和当前输入值相差一定量时,才应从源流中发出一个值.作为通用扩展方法,它可以具有以下签名:

I need the functionality of a hysteresis filter in RX. It should emit a value from the source stream only when the previously emitted value and the current input value differ by a certain amount. As a generic extension method, it could have the following signature:

public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)

我无法弄清楚如何使用现有的运营商来实现这一目标.我正在从RxJava寻找类似 lift 的方法,这是创建我自己的运算符的任何其他方法.我已经看到了这个

I was not able to figure out how to implement this with existing operators. I was looking for something like lift from RxJava, any other method to create my own operator. I have seen this checklist, but I haven't found any example on the web.

以下方法(实际上是相同的)对我来说似乎是解决方法,但是还有更多的 Rx方法可以做到这一点,例如无需包装subject或实际实现运算符?

The following approaches (both are actually the same) which seem workaround to me work, but is there a more Rx way to do this, like without wrapping a subject or actually implementing an operator?

async Task Main()
{
    var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

    var rnd = new Random();
    var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
            .Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)
            .Publish()
            .AutoConnect()
            ;

    s.Subscribe(Console.WriteLine, cts.Token);

    s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
    s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);

    await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
}

public static class ReactiveOperators
{
    public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
    {
        return new InternalHysteresisFilter<T>(source, filter).AsObservable; 
    }

    public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
    {
        var subject = new Subject<T>();
        T lastEmitted = default;
        bool emitted = false;

        source.Subscribe(
            value =>
            {
                if (!emitted || filter(lastEmitted, value))
                {
                    subject.OnNext(value);
                    lastEmitted = value;
                    emitted = true;
                }
            } 
            , ex => subject.OnError(ex)
            , () => subject.OnCompleted()
        );

        return subject;
    }

    private class InternalHysteresisFilter<T>: IObserver<T>
    {
        Func<T, T, bool> filter;
        T lastEmitted;
        bool emitted;

        private readonly Subject<T> subject = new Subject<T>();

        public IObservable<T> AsObservable => subject;

        public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
        {
            this.filter = filter;
            source.Subscribe(this);
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }

        public void OnNext(T value)
        {
            if (!emitted || filter(lastEmitted, value))
            {
                subject.OnNext(value);
                lastEmitted = value;
                emitted = true;
            }
        }

        public void OnError(Exception error)
        {
            subject.OnError(error);
        }

        public void OnCompleted()
        {
            subject.OnCompleted();
        }
    }
}

旁注:将有数千个这样的过滤器应用于尽可能多的流.我需要吞吐量和延迟之间的关系,因此,即使其他设备看起来比较理想,我也正在寻找一种解决方案,以在CPU和内存中使用最少的开销.

Sidenote: There will be several thousand of such filters applied to as many streams. I need throughput over latency, thus I am looking for the solution with the minimum of overhead both in CPU and in memory even if others look fancier.

推荐答案

我在书中看到的大多数示例 Rx简介正在使用方法Observable.Create创建新的运算符.

Most examples I've seen in the book Introduction to Rx are using the method Observable.Create for creating new operators.

Create 工厂方法是实现自定义可观察序列的首选方法.主题的使用应很大程度上保留在样本和测试领域. (引文)

The Create factory method is the preferred way to implement custom observable sequences. The usage of subjects should largely remain in the realms of samples and testing. (citation)

public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source,
    Func<T, T, bool> predicate)
{
    return Observable.Create<T>(observer =>
    {
        T lastEmitted = default;
        bool emitted = false;
        return source.Subscribe(value =>
        {
            if (!emitted || predicate(lastEmitted, value))
            {
                observer.OnNext(value);
                lastEmitted = value;
                emitted = true;
            }
        }, observer.OnError, observer.OnCompleted);
    });
}

这篇关于如何在rx.net中实现我自己的运算符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆