合并RX中的多个自定义可观察对象 [英] Merging multiple custom observables in RX

查看:62
本文介绍了合并RX中的多个自定义可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试对使用RX从许多发布者发出通知的系统进行建模.

Trying to model a system sending out notifications from a number of publishers using RX.

我有两个自定义接口ITopicObservable和ITopicObserver,用于对实现类将具有除IObservable和IObserver接口之外的其他属性和方法进行建模的事实.

I have two custom interfaces ITopicObservable and ITopicObserver to model the fact that the implementing classes will have other properties and methods apart from the IObservable and IObserver interfaces.

我的问题是我的想法是我应该能够将多个可观察对象添加在一起,将它们合并在一起,并订阅观察者以提供所有合并的可观察对象的更新.但是,带有问题"注释的代码将引发无效的强制转换异常.

The problem I have is that my thinking is I should be able to add a number of observables together, merge them together and subscribe to an observer to provide updates from all merged observables. However the code with "the issue" comment throws an invalid cast exception.

用例是多个独立的传感器,每个传感器在一个盒子中监控温度,例如,将其所有报告汇总到一个温度报告中,然后由温度运行状况监控器订阅.

The use case is a number of independent sensors each monitoring a temperature in a box for example that aggregate all their reports to one temperature report which is then subscribed to by a temperature health monitor.

我在这里想念什么?还是有使用RX来实现方案的更好方法?

What am I missing here? Or is there a better way to implement the scenario using RX?

下面的代码

using System;
using System.Reactive.Linq;
using System.Collections.Generic;

namespace test
{
class MainClass
{
    public static void Main (string[] args)
    {
        Console.WriteLine ("Hello World!");
        var to = new TopicObserver ();
        var s = new TopicObservable ("test");

        var agg = new AggregatedTopicObservable ();
        agg.Add (s);

        agg.Subscribe (to);
    }
}

public interface ITopicObservable<TType>:IObservable<TType>
{
    string Name{get;}
}

public class TopicObservable:ITopicObservable<int>
{
    public TopicObservable(string name)
    {
        Name = name;
    }
    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        return null;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}

    #endregion
}

public class AggregatedTopicObservable:ITopicObservable<int>
{
    List<TopicObservable> _topics;
    private ITopicObservable<int> _observable;
    private IDisposable _disposable;

    public AggregatedTopicObservable()
    {
        _topics = new List<TopicObservable>();
    }

    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add ((TopicObservable)observable);
    }

    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        _observable = (ITopicObservable<int>)_topics.Merge ();

        _disposable = _observable.Subscribe(observer);

        return _disposable;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}
    #endregion

}



public interface ITopicObserver<TType>:IObserver<TType>
{
    string Name{get;}
}

public class TopicObserver:ITopicObserver<int>
{
    #region IObserver implementation
    public void OnNext (int value)
    {
        Console.WriteLine ("next {0}", value);
    }
    public void OnError (Exception error)
    {
        Console.WriteLine ("error {0}", error.Message);
    }
    public void OnCompleted ()
    {
        Console.WriteLine ("finished");
    }
    #endregion
    #region ITopicObserver implementation
    public string Name { get;private set;}
    #endregion

}
}

推荐答案

您正在使用的.Merge(...)运算符的签名是:

The signature of the .Merge(...) operator that you're using is:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources)

.Merge()返回的实际类型为:

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32]

...因此,应该非常清楚地调用(ITopicObservable<int>)_topics.Merge();会失败.

...so it should be fairly clear that calling (ITopicObservable<int>)_topics.Merge(); would fail.

李的建议不要实施IObservable<>IObserver<>之一.它会导致上述错误.

Lee's advice not to implement either of IObservable<> or IObserver<> is the correct one. It leads to errors like the one above.

如果您必须执行这样的操作,我会这样做:

If you had to do something like this, I would do it this way:

public interface ITopic
{
    string Name { get; }
}

public interface ITopicObservable<TType> : ITopic, IObservable<TType>
{ }

public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType>
{ }

public interface ITopicObserver<TType> : ITopic, IObserver<TType>
{ }

public class Topic
{
    public string Name { get; private set; }

    public Topic(string name)
    {
        this.Name = name;
    }
}

public class TopicSubject : Topic, ITopicSubject<int>
{
    private Subject<int> _subject = new Subject<int>();

    public TopicSubject(string name)
        : base(name)
    { }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _subject.Subscribe(observer);
    }

    public void OnNext(int value)
    {
        _subject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _subject.OnError(error);
    }

    public void OnCompleted()
    {
        _subject.OnCompleted();
    }
}

public class AggregatedTopicObservable : Topic, ITopicObservable<int>
{
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>();

    public AggregatedTopicObservable(string name)
        : base(name)
    { }

    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add(observable);
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _topics.Merge().Subscribe(observer);
    }
}

public class TopicObserver : Topic, ITopicObserver<int>
{
    private IObserver<int> _observer;

    public TopicObserver(string name)
        : base(name)
    {
        _observer =
            Observer
                .Create<int>(
                    value => Console.WriteLine("next {0}", value),
                    error => Console.WriteLine("error {0}", error.Message),
                    () => Console.WriteLine("finished"));
    }

    public void OnNext(int value)
    {
        _observer.OnNext(value);
    }
    public void OnError(Exception error)
    {
        _observer.OnError(error);
    }
    public void OnCompleted()
    {
        _observer.OnCompleted();
    }
}

并运行:

var to = new TopicObserver("watching");
var ts1 = new TopicSubject("topic 1");
var ts2 = new TopicSubject("topic 2");

var agg = new AggregatedTopicObservable("agg");

agg.Add(ts1);
agg.Add(ts2);

agg.Subscribe(to);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

哪个给:


next 42
next 1
finished

但是除了可以给所有名称起名字(我不确定它如何帮助)之外,您还可以始终这样做:

But apart from being able to give everything a name (which I'm not sure how it helps) you could always do this:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));

var ts1 = new Subject<int>();
var ts2 = new Subject<int>();

var agg = new [] { ts1, ts2 }.Merge();

agg.Subscribe(to);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

相同的输出,没有接口和类.

Same output with no interfaces and classes.

还有一种更有趣的方式.试试这个:

There's even a more interesting way. Try this:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));

var agg = new Subject<IObservable<int>>();

agg.Merge().Subscribe(to);

var ts1 = new Subject<int>();
var ts2 = new Subject<int>();

agg.OnNext(ts1);
agg.OnNext(ts2);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

var ts3 = new Subject<int>();

agg.OnNext(ts3);

ts3.OnNext(99);
ts3.OnCompleted();

这将产生:


next 42
next 1
next 99

它允许您在合并后添加新的源观测值!

It allows you to add new source observables after the merge!

这篇关于合并RX中的多个自定义可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆