如何知道下一个值来自哪个 observable? [英] How to know which observable the next value comes from?

查看:30
本文介绍了如何知道下一个值来自哪个 observable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有这样的事情:

var observable = observable1
                 .Merge(observable2)
                 .Merge(observable3);

var subscription = observable.Subscribe(ValueHandler);

...

public void ValueHandler(string nextValue)
{
  Console.WriteLine($"Next value {nextValue} produced by /* sourceObservable */");
}

除了在每个 observable 实现中添加引用以及值之外,有没有办法从 observable1observable2 中获取源 observableobservable3 产生下一个值?

Short of adding that reference along with the value inside of each observable implementation, is there a way to get the source observable out of observable1, observable2 and observable3 that produced the next value?

推荐答案

不,这与 Merge 的设计目的完全相反.Merge 旨在获取多个流并将它们视为一个流.如果您想以某种方式分别对待它们,请使用不同的运算符.

No, that's exactly the opposite of what Merge is designed to do. Merge is designed to take multiple streams and treat them like one. If you wanted some way to treat them separately, use a different operator.

对于传递源的运算符,简短的回答是否定的.Rx 是关于对消息做出反应,来源无关紧要.我什至不确定您是否可以从概念上定义 Rx 中的源":

As for an operator which passes on the source, the short answer is no. Rx is about reacting to messages, the source is irrelevant. I'm not even sure if you could define conceptually what a 'source' is in Rx:

var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
var observable2 = observable1.Select(a => a.ToString());
var subscription = observable2.Subscribe(s => s.Dump());

订阅源是observable1observable2,还是某种指向系统时钟的指针?

Is the subscription source observable1, observable2, or some sort of pointer to the system clock?

如果你想分离进入合并的消息,那么你可以使用 Select 如下:

If you wanted to separate the messages entering into a merge then you could use Select as follows:

var observable = observable1.Select(o => Tuple.Create("observable1", o))
  .Merge(observable2.Select(o => Tuple.Create("observable2", o)))
  .Merge(observable3.Select(o => Tuple.Create("observable3", o)));

如果这太乱了,那么您可以轻松制作一个扩展方法来清理它.

If that's too messy, then you could easy make an extension method to clean it up.

我还要补充一点,您在答案中发布的代码不是很像 Rx.一般准则是避免直接实现 IObservable.School 可以更简洁地重写如下:

I'll also add that the code you posted in your answer isn't very Rx-like. General guidelines are to avoid directly implementing IObservable. A School can more concisely be re-written as follows:

    public class School
    {
        //private Subject<Student> _subject = null;
        private readonly ISubject<Student> _applicationStream = null;

        public static readonly int MaximumNumberOfSeats = 100;

        public string Name { get; set; }

        public School(string name)
            : this(name, new Subject<Student>())
        {
        }

        public School(string name, ISubject<Student> applicationStream )
        {
            Name = name;
            _applicationStream = applicationStream;
        }

        public void AdmitStudent(Student s)
        {
            _applicationStream.OnNext(s);
        }

        public IObservable<Student> ApplicationStream()
        {
            return _applicationStream;
        }

        public IObservable<Student> AcceptedStream()
        {
            return _applicationStream
                .SelectMany(s => s != null ? Observable.Return(s) : Observable.Throw<Student>(new ArgumentNullException("student")))
                .Distinct()
                .Take(MaximumNumberOfSeats);
        }
    }

通过这种方式,您可以订阅所有申请、接受以及拒绝等.您的状态也更少(没有 List),理想情况下您甚至会删除 SubjectapplicationStream 并将其转换为 Observable 并在某处传递.

This way you can subscribe to all applications, the acceptances, and if you wanted to, the rejects, etc.. You also have less state (no List<Student>), and ideally you would even remove the Subject<Student> applicationStream and turn that into an Observable that gets passed in somewhere.

这篇关于如何知道下一个值来自哪个 observable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆