如何共享一个可观察的与发布和连接? [英] How do I share an observable with publish and connect?

查看:108
本文介绍了如何共享一个可观察的与发布和连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有我申请操作可观察到的数据流,分裂成两个单独的流,施加更多(不同)的操作,以每两个流,并再次合并在一起。我想分享两个用户之间可观察到的使用发布连接,但每个用户的似乎用单独的流。也就是说,在下面的例子中,我看到做一个昂贵的操作的数据流的中的每个项目一次印刷同时为用户即可。 (想象一下,昂贵的操作是东西应该都是用户之间只发生一次,因此我想重用流。)我已经使用发布和<$ C $ 。C>连接试着和两个订阅分享合并后的观察到的,但它似乎有错误的生效



示例这个问题:

  VAR foregroundScheduler =新NewThreadScheduler(TS =>新建线程(TS){=的IsBackground假}); 
变种计时器= Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(10),foregroundScheduler);
VAR昂贵= timer.Select(I =>
{
//转换为字符串是一个昂贵的操作
Console.WriteLine(做一个昂贵的操作);
返回的String.Format(#{0},我);
});

VAR一个= expensive.Where(S = GT; int.Parse(s.Substring(1))%2 == 0)。选择(S = gt;新建{源=A ,值= S});
变种B = expensive.Where(S = GT;!int.Parse(s.Substring(1))%2 = 0)。选择(S = gt;新建{源=B,值= S });

变种连接= Observable.Merge(A,B).Publish();
connectable.Where(X => x.Source.Equals(A))订阅(S = GT; Console.WriteLine(A先生有:{0},S))。
connectable.Where(X => x.Source.Equals(B))订阅(S = GT; Console.WriteLine(用户B获得:{0},S))。
connectable.Connect();



我看到下面的输出:



<预类=郎无prettyprint-覆盖> 做昂贵的操作
做昂贵的操作
用户A得到:{源= A,值=#0}
做昂贵操作
做昂贵的操作
用户b的了:{来源= b,值=#1}

(输出持续,截断为简洁起见。)



我如何可以与用户分享观察到的?

解决方案

您已经发表了错误的观测。



通过你合并,然后发布这样<当前的代码code> Observable.Merge(A,b).Publish(); 。现在,因为 A &安培; B 是针对定义昂贵你仍然得到两个订阅昂贵



订阅创建这些管道:



从您的代码;

您可以看到这一点,如果你拿出 .Publish()。输出变为:



<预类=郎无prettyprint-覆盖> 做一个昂贵的操作
做一个昂贵的操作
做一个昂贵的操作
做一个昂贵的操作
用户A得到:{源= A,值=#0}
做一个昂贵的操作
做一个昂贵的操作
做一个昂贵的操作
做一个昂贵的操作
用户b的了:{来源= b,值=#1}

这创建这些管道:





所以,通过移动 .Publish()回升至昂贵你消除这个问题。 。这就是你真正需要它,因为它是昂贵的操作,毕竟



这是你所需要的代码:

  VAR foregroundScheduler =新NewThreadScheduler(TS =>新建线程(TS){=的IsBackground假}); 
变种计时器= Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(10),foregroundScheduler);
VAR昂贵= timer.Select(I =>
{
//转换为字符串是一个昂贵的操作
Console.WriteLine(做一个昂贵的操作);
返回的String.Format(#{0},我);
});

VAR连接= expensive.Publish();

VAR一个= connectable.Where(S = GT; int.Parse(s.Substring(1))%2 == 0)。选择(S = gt;新建{源=A ,值= S});
变种B = connectable.Where(S = GT;!int.Parse(s.Substring(1))%2 = 0)。选择(S = gt;新建{源=B,值= S });

变种合​​并= Observable.Merge(A,B);

merged.Where(X => x.Source.Equals(A))订阅(S = GT; Console.WriteLine(A先生有:{0},S) );
merged.Where(X => x.Source.Equals(B))订阅(S = GT; Console.WriteLine(用户B获得:{0},S))。

connectable.Connect();

这很好地产生了以下内容:



<预类=郎无prettyprint-覆盖> 做一个昂贵的操作
用户A得到:{源= A,值=#0}
做一个昂贵的操作
用户b的了:{来源= b,值=#1}
做一个昂贵的操作
用户A得到:{源= A,值=#2}
这样一个昂贵的操作
用户b的了:{来源= b,值=#3}

和这给你这些管道:





您可以从这个图像有见仍然重复。这是因为这些部位并不昂贵的罚款。



复制实际上是非常重要的。管线的共用部位使他们的端点容易出错,从而提前终止。少分享的代码的鲁棒性越好。当你有一个昂贵的操作,你应该担心出版这是唯一。否则,你应该让管道是他们自己。



下面是显示它的一个例子。如果您没有公布源呢,如果一个源产生错误,那么它不会拉下所有的管道等。





但是,一旦你介绍一个共享的可观察到的话单错误会带来了所有的管道等。




I have an observable data stream that I am applying operations to, splitting into two separate streams, applying more (distinct) operations to each of the two streams, and merging together again. I am trying to share the observable between two subscribers using Publish and Connect but each of the subscribers seems to be using a separate stream. That is, in the example below, I see "Doing an expensive operation" printed once for each item in the stream for both of the subscribers. (Imagine the expensive operation as being something that should happen only once between all subscribers, as such I am trying to reuse the stream.) I have used Publish and Connect to try and share the merged observable with both subscribers, but it seems to have the wrong effect.

Example with the issue:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();

I see the following output:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }

(Output continues, truncated for brevity.)

How can I share the observable with both subscribers?

解决方案

You have published the wrong observable.

With the current code you are merging and then publishing like this Observable.Merge(a, b).Publish();. Now since a & b are defined against expensive you still get two subscriptions to expensive.

The subscriptions create these pipelines:

You can see this if you take out the .Publish(); from your code. The output becomes:

Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }

This creates these pipelines:

So, by shifting the .Publish() back up to expensive you eliminate the problem. That's where you really needed it because it is the expensive operation after all.

This is the code you needed:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var connectable = expensive.Publish();

var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);

merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

connectable.Connect();

That nicely produces the following:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }

And this gives you these pipelines:

You can see from this image that there is still duplication. That's fine because these parts aren't expensive.

The duplication is actually important. Shared parts of the pipelines make their endpoints vulnerable to errors and thus to early termination. The less sharing the better for the robustness of the code. It's only when you have an expensive operation that you should worry about publishing. Otherwise you should just let the pipelines be themselves.

Here's an example to show it. If you don't have a published source then, if one source produces an error then it doesn't pull down all of the pipelines.

But once you introduce a shared observable then a single error will bring down all of the pipelines.

这篇关于如何共享一个可观察的与发布和连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆