Rx:什么是订阅,订阅如何工作? [英] Rx: What are subscriptions and how do subscriptions work?

查看:54
本文介绍了Rx:什么是订阅,订阅如何工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习.NET中的反应式扩展(rx),并且在预订"的真正含义和使用时间方面有些挣扎.

I'm learning reactive extensions (rx) in .NET and I'm struggling a little bit with what a "subscription" really is and when it is used.

让我们获取一些示例数据,这些数据取自

Lets take some sample data, taken from this thread:

using System;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        class Result
        {
            public bool Flag { get; set; }
            public string Text { get; set; }
        }

        static void Main(string[] args)
        {               
            var source =
               Observable.Create<Result>(f =>
               {
                   Console.WriteLine("Start creating data!");

                   f.OnNext(new Result() { Text = "one", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "two", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "three", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "four", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "five", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "six", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "seven", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "eight", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "nine", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "ten", Flag = false });

                   return () => Console.WriteLine("Observer has unsubscribed");
               });
        }
    }
}

当心这一行:

 Console.WriteLine("Start creating data!");

现在,首先,我认为只需使用 .Subscribe 运算符即可使用订阅.因此,观察者(例如 .Subscribe 函数的回调)订阅了这样的可观察对象(一连串的运算符的最后一个返回值)(仅作为示例,查询没有实际使用):

Now, first I thought a subscription simply is used by using the .Subscribe operator. So an observer (e.g. the callback of the .Subscribe function) subscribes to an observable (the last return value of a chain of operators) like this (just as an example, the query doesn't have a real use):

  source.Zip(source, (s1, s0) =>
     s0.Flag
     ? Observable.Return(s1)
     : Observable.Empty<Result>()).Merge().Subscribe(f => { Console.WriteLine(f.Text); });

现在我期望得到开始创建数据!"仅输出一次,因为我只使用一个订阅.但实际上,我两次获得了:

Now I was expecting to get the "Start creating data!" output only once, since I was only using one subscription. But in fact, I got it twice:

Start creating data!
Start creating data!
two
five
six
seven
nine

有人告诉我,每次我在 source 上使用运算符时,都会进行订阅.但是在此示例中,我仅使用一次 source.,然后再次使用它作为 .Zip 运算符的参数.还是因为源通过再次订阅的值传递到 .Zip 函数?

I was told that everytime I use an operator on source., a subscription is made. But in this example I'm using source. only once and then a second time just as a parameter for the .Zip operator. Or is it because the source is passed to the .Zip function by value subscribed again?

所以我的问题是:

  1. 就Rx而言,订阅"到底是什么?
  2. 在我的示例中,为什么/这两个订阅发生在何处?

顺便说一句.我知道我可以使用 .Publish 运算符防止发生多个订阅,但这不属于我的问题.

Btw. I know I can prevent multiple subscriptions from happening by using the .Publish operator, but that isn't the scope of my questions.

推荐答案

简单来说,订阅仅表示已订阅的 Observable .可以通过使用 .Subscribe 显式地进行此过程,也可以通过加入两个或多个 Observables 然后订阅所产生的链来隐式地进行此过程.

In simple terms a Subscription just represents an Observable that has been subscribed to. This process can happen either explicitly by using .Subscribe or implicitly by joining two or more Observables and then subscribing to the resulting chain.

在您的情况下,您会看到两种情况都发生了,一次是在调用 Subscribe 时显式发生,一次是在将 source 传递给 Zip 时隐式发生.就是,有两个 Subscriptions source Observable .

In your case you are seeing both happen, once explicitly when you call Subscribe and one implicitly when you pass source to Zip, that is, there are two Subscriptions to the source Observable.

为什么那么重要?因为默认情况下 Observables 是惰性的,这意味着它们只有在您订阅它们后才会开始处理(该过程的产品为 Subscription ),通过扩展,这意味着<任何时间,您订阅 Observable 都会有效地开始新的视频流.可以像使用 Publish 所提到的那样来覆盖此行为,但是默认情况是将每个 Observable 设置为 cold .

Why is that important? Because by default Observables are lazy, meaning that they will not begin processing until you subscribe to them (the product of that process being a Subscription), by extension this means that any time you subscribe to the Observable it will effectively begin a new stream. This behavior can be overridden like you alluded to with Publish, but the default is for each Observable to be cold.

在您的特定情况下,由于您要将相同的 Observable 传递给 Zip ,因此需要订阅两次,因为它将把两个传递的事件压缩在一起流.结果是对同一 Observable 的两个订阅,每个订阅彼此独立运行.

In your specific case, since you are passing the same Observable to Zip it needs to subscribe to it twice, since it will be zipping together events from the two passed streams. The result is two subscriptions to the same Observable which each run independently of each other.

这篇关于Rx:什么是订阅,订阅如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆