是否有可能调用在接收不同的线程用户的OnNexts? [英] Is it possible to invoke subscribers's OnNexts on different threads in Rx?

查看:112
本文介绍了是否有可能调用在接收不同的线程用户的OnNexts?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来接收。我想知道,如果有可能派遣一个消息给不同的用户,这样它们在不同的线程上运行?一个IObserable如何控制呢?平原主体实施,据我了解,它调用用户一个接一个地在单个线程之后。






 公共类Subsciber:IObserver< INT> 
{
公共无效OnNext(int类型的)
{
//做些什么
}
公共无效的OnError(例外五)
{
//动作
}
公共无效OnCompeleted()
{
}

}

公静态类节目
{
公共无效静态Main()
{
无功可观察到的新=< .... .... SomeClass的GT&();
变种SUB1 =新用户();
变种SUB 2 =新用户();
observable.Subscribe(SUB1);
observable.Subscribe(SUB2);
//一些等待功能
}
}

如果我利用主题为SomeClass的',然后SUB2的OnNext()将不会被调用,直到SUB1的OnNext()完成。如果SUB1是需要花费大量的时间,我不希望它耽误SUB2招待会。谁能告诉我的Rx是如何允许这种实施SomeClass的的。


解决方案

你写的代码几乎没有运行观察到在平行。如果你写你的观察者这样的:

 公共类用户:IObserver< INT> 
{
公共无效OnNext(int类型的)
{
Console.WriteLine({0}在{1}在{2},
A,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString());
}
公共无效的OnError(例外五)
{}
公共无效OnCompleted()
{}
}

然后运行该代码:

  VAR观察的= 
观测
.Interval(TimeSpan.FromSeconds(1.0))
。选择(X =>(INT)X)
。取(5)
.ObserveOn(Scheduler.ThreadPool);
变种SUB1 =新用户();
变种SUB 2 =新用户();
observable.Subscribe(SUB1);
observable.Subscribe(SUB2);
Thread.sleep代码(10000);



将产生如下:



<$ P $ 28 p> 0在2011/10/20 0点13分49秒
0 16日2011/10/20 0点13分49秒
1月29日的2011 / 10/20〇时13分五十秒
1月22日在2011/10/20〇时13分五十秒
2月27日在2011/10/20 0时13分51秒
2 29日2011/10/20○点13分51秒
3月27日在2011/10/20 0点13分52秒
3月19日在2011/10/20 0点13分52秒
4月27日在2011/10/20○点13分53秒
4月27日在2011/10/20○点13分53秒

它已经在不同的线程并行运行的预订。



这是我使用的是<$ C最重要的事情$ C> .ObserveOn 扩展方法 - 那就是做这项工作。



您应该记住,观察者一般不共享相同的。观测实例。订阅可观察到的有效直径达观察到运营商的独特的产业链,从可观察的观察者源。这是大致相同调用的GetEnumerator 上一个可枚举两次,你不会共享相同的枚举的实例,你会得到两个独特的实例。



现在,我想描述一下我的意思链。我打算从给Reflector.NET提取的代码 Observable.Generate &安培; Observable.Where 来说明这一点。



就拿这段代码:

 变种XS = Observable.Generate(0,X = X的催化剂小于10中,x => X + 1,X = X的催化剂); 
VAR YS = xs.Where(X => X%2 == 0);
ys.Subscribe(Y => {/ *产生0,2,4,6,8 * /});



引擎盖下都生成&安培; 其中,每个创建内部类接收的新实例 AnonymousObservable< T> 。对于构造 AnonymousObservable< T> 需要 Func键< IObserver< T> ;, IDisposable的> 它使用每当它代表接收到呼叫订阅



对于稍微清理代码 Observable.Generate< T>(...)从Reflector.NET是:

 公共静态的IObservable< TResult>生成< TSTATE,TResult>(
TSTATE初始化状态,
Func键< TSTATE,布尔>的条件,
Func键< TSTATE,TSTATE>迭代,
Func键< TSTATE,TResult> resultSelector,
IScheduler调度)
{
返回新AnonymousObservable< TResult>((IObserver< TResult>观察者)=>
{
TSTATE状态=初始化状态;
BOOL第一= TRUE;
返回scheduler.Schedule((动作个体经营)=>
{
布尔标志=假;
TResult本地=默认(TResult);

{
如果(第一)
{
第一= FALSE;
}
,否则
{
状态=迭代(州);
}
标志=条件(状态);
如果(标志)
{
本地= resultSelector(州);
}
}
赶上(例外的例外)
{
observer.OnError(例外);
的回报;
}
如果(标志)
{
observer.OnNext(本地);
自();
}
,否则
{
observer.OnCompleted();
}
});
});
}



行动自参数是一个递归调用迭代输出值。在这段代码的观察获取存储或价值得到粘贴到多个观察员你会发现,没有出路的。这段代码为每个新的观察员运行一次



Observable.Where<的稍微清理代码; T>(...)从Reflector.NET是:

 公共静态的IObservable< TSource>其中,< TSource>(
本的IObservable< TSource>源,
Func键< TSource,布尔>谓语)
{
返回新AnonymousObservable< TSource>(观察员=>
source.Subscribe(X =>
{
布尔标志;

{
标志=谓词(X);
}
赶上(例外的例外)
{
observer.OnError(例外);
的回报;
}
如果(标志)
{
observer.OnNext(X);
}
},EX => observer.OnError(前),()=> observer.OnCompleted));
}

这同样的代码不会跟踪多个观察员。它调用订阅有效地传递自己的代码作为观察员底层观察到。



您应该看到的是,在我的示例代码上面,订阅其中,创建一个订阅生成因此这是观测链。事实上,它的链接一系列 AnonymousObservable 对象认购的电话。



如果你有两个订阅你有两个链。如果你有1000个订阅你有1000个连锁店



现在,就像一个侧面说明 - 即使有的IObservable< T> IObserver< T> 接口 - 你应该非常很少实际上是在你自己的类实现这些。内置类和运营商处理的所有案件99.99%。这是一个有点像的IEnumerable< T> - 你需要多久实现这个接口自己



让我知道这是否可以帮助,如果您需要任何进一步的解释。


I am new to Rx. I want to know if it is possible to dispatch a message to different subscribers such that they run on different thread? How can an IObserable control it? The plain Subject implementation, as I understand it calls the subscribers one after the other on a single thread.


public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }

} 

public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}

If I use Subject as 'SomeClass', then sub2's OnNext() will not be called until sub1's OnNext() is completed. If sub1 is takes a lot of time, I don't want it to delay sub2's reception. Can someone tell me how Rx allows this kind of implementation for SomeClass.

解决方案

The code you have written is almost there to run the observable in parallel. If you write your observer as this:

public class Subscriber : IObserver<int>
{
    public void OnNext(int a)
    {
        Console.WriteLine("{0} on {1} at {2}",
            a,
            Thread.CurrentThread.ManagedThreadId,
            DateTime.Now.ToString());
    }
    public void OnError(Exception e)
    { }
    public void OnCompleted()
    { }
} 

Then running this code:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => (int)x)
        .Take(5)
        .ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);

Will produce the following:

0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53

It's already running the subscriptions in parallel on different threads.

The important thing that I used was the .ObserveOn extension method - that's what made this work.

You should keep in mind that observers don't generally share the same instance of observables. Subscribing to an observable effectively wires up a unique "chain" of observable operators from the source of the observable to the observer. This is much the same as calling GetEnumerator twice on an enumerable, you will not share the same enumerator instance, you will get two unique instances.

Now, I want to describe what I mean by a chain. I'm going to give the Reflector.NET extracted code from Observable.Generate & Observable.Where to illustrate the point.

Take this code for example:

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });

Under the hood both Generate & Where each create a new instance of the internal Rx class AnonymousObservable<T>. The constructor for AnonymousObservable<T> takes a Func<IObserver<T>, IDisposable> delegate which it uses whenever it receives a call to Subscribe.

The slightly cleaned up code for Observable.Generate<T>(...) from Reflector.NET is:

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState,
    Func<TState, bool> condition,
    Func<TState, TState> iterate,
    Func<TState, TResult> resultSelector,
    IScheduler scheduler)
{
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
    {
        TState state = initialState;
        bool first = true;
        return scheduler.Schedule((Action self) =>
        {
            bool flag = false;
            TResult local = default(TResult);
            try
            {
                if (first)
                {
                    first = false;
                }
                else
                {
                    state = iterate(state);
                }
                flag = condition(state);
                if (flag)
                {
                    local = resultSelector(state);
                }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(local);
                self();
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}

The Action self parameter is a recursive call that iterates output values. You'll notice that nowhere in this code does the observer get stored or that the values get pasted to more than one observer. This code runs once for each new observer.

The slightly cleaned up code for Observable.Where<T>(...) from Reflector.NET is:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return new AnonymousObservable<TSource>(observer =>
        source.Subscribe(x =>
        {
            bool flag;
            try
            {
                flag = predicate(x);
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(x);
            }
        }, ex => observer.OnError(ex), () => observer.OnCompleted));
}

Again this code doesn't track multiple observers. It calls Subscribe effectively passing its own code as the observer to the underlying source observable.

You should see that, in my example code above, subscribing to Where creates a subscription to Generate and hence this is a chain of observables. In fact it's chaining subscribe calls on a series of AnonymousObservable objects.

If you have two subscriptions you have two chains. If you have 1,000 subscriptions you have 1,000 chains.

Now, just as a side note - even though there are IObservable<T> and IObserver<T> interfaces - you should very very rarely actually implement these in your own classes. The built-in classes and operators handle 99.99% of all cases. It's a bit like IEnumerable<T> - how often do you need to implement this interface yourself?

Let me know if this helps and if you need any further explanation.

这篇关于是否有可能调用在接收不同的线程用户的OnNexts?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆