TakeWhile(...)等扩展方法在Rx 1.0中安全吗? [英] Is TakeWhile(...) and etc. extension methods thread safe in Rx 1.0?

查看:55
本文介绍了TakeWhile(...)等扩展方法在Rx 1.0中安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个事件源,它基于底层设计经常被网络I/O触发,当然,每次事件总是在不同的线程上,现在我通过Rx将事件包装为:Observable.FromEventPattern(...),现在我正在使用TakeWhile(predict)来过滤一些特殊事件数据. 现在,我对它的线程安全性有所担心,TakeWhile(predict)用作击键和静音,但是在并发情况下,是否仍然可以保证?因为我猜潜在的实现可能是(因为太复杂了,我无法阅读源代码...):

I have an event source which fired by a Network I/O very frequently, based on underlying design, of course the event was always on different thread each time, now I wrapped this event via Rx with: Observable.FromEventPattern(...), now I'm using the TakeWhile(predict) to filter some special event data. At now, I have some concerns on its thread safety, the TakeWhile(predict) works as a hit and mute, but in concurrent situation, can it still be guaranteed? because I guess the underlying implementation could be(I can't read the source code since it's too complicated...):

    public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predict)
    {
        ISubject<TSource> takeUntilObservable = new TempObservable<TSource>();
        IDisposable dps = null;
        // 0 for takeUntilObservable still active, 1 for predict failed, diposed and OnCompleted already send. 
        int state = 0;
        dps = source.Subscribe(
             (s) =>
             {
                 /* NOTE here the 'hit and mute' still not thread safe, one thread may enter 'else' and under CompareExchange, but meantime another thread may passed the predict(...) and calling OnNext(...)
                  * so the CompareExchange here mainly for avoid multiple time call OnCompleted() and Dispose();
                  */

                 if (predict(s) && state == 0)
                 {
                     takeUntilObservable.OnNext(s);
                 }
                 else
                 {
                     // !=0 means already disposed and OnCompleted send, avoid multiple times called via parallel threads.
                     if (0 == Interlocked.CompareExchange(ref state, 1, 0))
                     {
                         try
                         {
                             takeUntilObservable.OnCompleted();
                         }
                         finally
                         {
                             dps.Dispose();
                         }
                     }
                 }
             },
             () =>
             {
                 try
                 {
                     takeUntilObservable.OnCompleted();
                 }
                 finally { dps.Dispose(); }
             },
             (ex) => { takeUntilObservable.OnError(ex); });
        return takeUntilObservable;
    }

TempObservable只是ISubject的简单实现.
如果我的猜测合理,那么似乎无法保证线程安全,这意味着某些意外事件数据可能仍会传入OnNext(...),因为该静音"仍在继续. 然后,我编写了一个简单的测试来进行验证,但是出乎意料的是,结果都是肯定的:

That TempObservable is just a simple implementation of ISubject.
If my guess reasonable, then seems the thread safety can't be guaranteed, means some unexpected event data may still incoming to OnNext(...) because that 'mute' is still on going. Then I write a simple testing to verify, but out of expectation, the results are all positive:

    public class MultipleTheadEventSource
    {
        public event EventHandler OnSthNew;
        int cocurrentCount = 1000;
        public void Start()
        {
            for (int i = 0; i < this.cocurrentCount; i++)
            {
                int j = i;
                ThreadPool.QueueUserWorkItem((state) =>
                {
                    var safe = this.OnSthNew;
                    if (safe != null)
                        safe(j, null);
                });
            }
        }
    }

    [TestMethod()]
    public void MultipleTheadEventSourceTest()
    {
        int loopTimes = 10;
        int onCompletedCalledTimes = 0;
        for (int i = 0; i < loopTimes; i++)
        {
            MultipleTheadEventSource eventSim = new MultipleTheadEventSource();
            var host = Observable.FromEventPattern(eventSim, "OnSthNew");
            host.TakeWhile(p => { return int.Parse(p.Sender.ToString()) < 110; }).Subscribe((nxt) =>
            {
                //try print the unexpected values, BUT I Never saw it happened!!!
                if (int.Parse(nxt.Sender.ToString()) >= 110)
                {
                    this.testContextInstance.WriteLine(nxt.Sender.ToString());
                }
            }, () => { Interlocked.Increment(ref onCompletedCalledTimes); });
            eventSim.Start();
        }

        // simply wait everything done.
        Thread.Sleep(60000);
        this.testContextInstance.WriteLine("onCompletedCalledTimes: " + onCompletedCalledTimes);
    }

在进行测试之前,这里的一些朋友建议我尝试使用Synchronize<TSource>ObserveOn使其具有线程安全性,因此对我的前进思路有何想法,以及为什么未重现该问题?

before I do the testing, some friends here suggest me try to use Synchronize<TSource> or ObserveOn to make it thread safe, so any idea on my proceeding thoughts and why the issue not reproduced?

推荐答案

根据您的其他问题,

As per your other question, the answer still remains the same: In Rx you should assume that Observers are called in a serialized fashion.

提供更好的答案;最初,Rx团队确保Observable序列是线程安全的,但是对于性能良好/设计良好的应用程序,性能的损失是不必要的.因此,决定删除线程安全性以消除性能成本.为了让您选择恢复线程安全,可以应用Synchronize()方法,该方法将序列化所有方法调用OnNext/OnError/OnCompleted.这并不意味着它们将在同一线程上被调用,但是在处理另一个线程时,您将不会调用OnNext方法.

To provider a better answer; Originally the Rx team ensured that the Observable sequences were thread safe, however the performance penalty for well behaved/designed applications was unnecessary. So a decision was taken to remove the thread safety to remove the performance cost. To allow you to opt back into to thread safety you could apply the Synchronize() method which would serialize all method calls OnNext/OnError/OnCompleted. This doesn't mean they will get called on the same thread, but you wont get your OnNext method called while another one is being processed.

这是一个坏消息,从内存来看,这发生在Rx 2.0中,而您特别询问的是Rx 1.0. (我不确定Synchonize()甚至存在于1.xx中吗?)

The bad news, from memory this happened in Rx 2.0, and you are specifically asking about Rx 1.0. (I am not sure Synchonize() even exists in 1.xx?)

因此,如果您使用的是Rx v1,则对什么是线程安全的以及什么不是线程安全的具有模糊的确定性.我很确定主题是安全的,但是我不确定FromEventPattern这样的工厂方法.

So if you are in Rx v1, then you have this blurry certainty of what is thread safe and what isn't. I am pretty sure the Subjects are safe, but I can't be sure about the factory methods like FromEventPattern.

我的建议是:如果需要确保线程安全,请序列化数据管道.最简单的方法是使用单线程IScheduler实现,即DispatcherScheduler或EventLoopScheduler实例.

My recommendation is: if you need to ensure thread safety, Serialize your data pipeline. The easiest way to do this is to use a single threaded IScheduler implementation i.e. DispatcherScheduler or a EventLoopScheduler instance.

一些好消息是,当我在Rx上编写这本书时,它确实针对v1,因此本节与您非常相关

Some good news is that when I wrote the book on Rx it did target v1, so this section is very relevant for you http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html

因此,如果您现在的查询如下所示:

So if your query right now looked like this:

Observable.FromEventPatter(....)
          .TakeWhile(x=>x>5)
          .Subscribe(....);

要确保对管道进行序列化,可以创建一个EventLoopScheduler(以为此专用线程为代价):

To ensure that the pipeline is serialized you can create an EventLoopScheduler (at the cost of dedicating a thread to this):

var scheduler = new EventLoopScheduler();
Observable.FromEventPatter(....)
          .ObserveOn(scheduler)
          .TakeWhile(x=>x>5)
          .Subscribe(....);

这篇关于TakeWhile(...)等扩展方法在Rx 1.0中安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆