无扩展:用户并发内 [英] Reactive Extensions: Concurrency within the subscriber

查看:130
本文介绍了无扩展:用户并发内的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图总结我的周围无扩展'的并发支持的头部和我有一个很难得到的结果我之后。所以,我可能没有的得到它只是还没有

I'm trying to wrap my head around Reactive Extensions' support for concurrency and am having a hard time getting the results I'm after. So I may not get it just yet.

我有数据发射到流比用户可以更快地消耗它的来源。我更愿意配置使得另一个线程用于调用该用户从该流的每个新项,使得用户具有穿过它同时运行多个线程的流。我是能够保证用户的线程safeness

I have a source that emits data into the stream faster than the subscriber can consume it. I'd prefer to configure the stream such that another thread is used to invoke the subscriber for each new item from the stream, so that the subscriber has multiple threads running through it concurrently. I am able to ensure the thread-safeness of the subscriber.

下面的示例演示了此问题:

The following sample demonstrates the problem:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });



控制台输出看起来像这样(日期删除):

The console output looks like this (dates removed):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6

请注意实测值时增量。该用户没有被并行调用,即使源继续发出超过用户可以对其进行处理更快的数据。虽然我可以想像了一堆场景中当前的行为将是有益的,我需要能够尽快变得可用处理的消息。

Please notice the "Observed value" time deltas. The subscriber isn't invoked in parallel even though the source continues to emit data faster than the subscriber can process it. While I can imagine a bunch of scenarios where the current behavior would be useful, I need to be able to process the messages as soon as they become available.

我已经试图调度的几个变化与ObserveOn方法,但他们都不做我想做的。

I've tried several variations of Schedulers with the ObserveOn method, but none of them seem to do what I want.

除了分拆线程内的订阅操作以执行长运行工作,有什么我丢失,将允许订户数据并发送到?

Other than spinning off a thread within the Subscribe action to perform the long running work, is there anything I'm missing that will allow for concurrent delivery of data to the subscriber?

在提前为所有的答案和建议,谢谢!

Thanks in advance for all answers and suggestions!

推荐答案

这里的根本问题是,你希望观察到的接收调度事件,真正打破了如何观测工作规则的一种方式。我认为这将是有益的看看这里的接收设计准则: http://go.microsoft.com/fwlink/ ?链路ID = 205219 - 最明显的是,4.2假设观察者实例调用序列化的时尚。即你并不意味着能够运行在OnNext并行调用。事实上接收的顺序行为是美丽的中央到它的设计理念。

The fundamental problem here is that you want the Rx observable to dispatch events in a way that really breaks the rules of how observables work. I think it would be instructive to look at the Rx design guidelines here: http://go.microsoft.com/fwlink/?LinkID=205219 - most notably, "4.2 Assume observer instances are called in a serialized fashion". i.e. You're not meant to be able to run OnNext calls in parallel. In fact the ordering behaviour of Rx is pretty central to it's design philosophy.

如果你看看源代码,你会看到,接收inforces这种行为的 ScheduledObserver< T> 类从 ObserveOnObserver< T> 导出... OnNexts是从内部队列调度和每个人都必须完成下一个被派遣之前 - 给定的执行范围内。 RX不会让个人用户的OnNext调用来同时执行。

If you look at the source, you'll see that Rx inforces this behaviour in the ScheduledObserver<T> class from which ObserveOnObserver<T> is derived... OnNexts are dispatched from an internal queue and each must complete before the next one is dispatched - within the given execution context. Rx won't allow an individual subscriber's OnNext calls to execute concurrently.

这并不是说你不能有以不同的速率虽然执行多个subscibers。事实上,这是很容易看到,如果你改变你的代码如下:

That's not to say you can't have multiple subscibers executing at different rates though. In fact this is easy to see if you change your code as follows:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now,
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default);

var subscription1 = source.Subscribe(x =>
    {
        Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
                            DateTime.Now,
                            Thread.CurrentThread.ManagedThreadId, x);
        Thread.Sleep(1000); // Simulate long work time
    });

var subscription2 = source.Subscribe(x =>
{
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
                        DateTime.Now,
                        Thread.CurrentThread.ManagedThreadId, x);
    Thread.Sleep(5000); // Simulate long work time
});

现在你会看到用户1走在前面认购2。

Now you'll see Subscriber 1 getting ahead of Subscriber 2.

你不能轻易做的是问一个可观察到像做一个OnNext呼叫派遣就绪用户 - 这是一种你在一种迂回的方式询问什么。我也相信你不会真的想创建一个缓慢的消费情况为每一位OnNext一个新线程!

What you can't easily do is ask an observable to do something like dispatch of an OnNext call to a "ready" subscriber - which is kind of what you are asking for in a roundabout way. I also presume you wouldn't really want to create a new thread for every OnNext in a slow consumer situation!

在这种情况下,它听起来就像你可能会更好过单个用户不执行任何比推工作其他到队列尽可能快,后者又由若干费时的工作线程,那么你可以控制必要保持同步的服务。

In this scenario it sounds like you might be better off with a single subscriber that does nothing other than push work onto a queue as fast as possible, which is in turn serviced by a number of consuming worker threads you could then control as necessary to keep pace.

这篇关于无扩展:用户并发内的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆