实施的IObservable< T>从头开始 [英] Implementing IObservable<T> from scratch

查看:83
本文介绍了实施的IObservable< T>从头开始的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

无功扩展配备了大量的辅助方法为将现有的事件和异步操作可观测到,但你会如何实施的IObservable< T>从头开始?

The Reactive Extensions come with a lot of helper methods for turning existing events and asynchronous operations into observables but how would you implement an IObservable<T> from scratch?

IEnumerable的有可爱的产量关键字,使实现它非常简单。

IEnumerable has the lovely yield keyword to make it very simple to implement.

什么是实施的IObservable<的正确方法; T>

What is the proper way of implementing IObservable<T>?

我是否需要担心线程安全?

Do I need to worry about thread safety?

我知道有获取叫回在一个特定的同步上下文的支持,但就是这个东西,我作为一个的IObservable< T>笔者需要担心的还是这个莫名其妙的内置

I know there is support for getting called back on a specific synchronization context but is this something I as an IObservable<T> author need to worry about or this somehow built-in?

更新:

下面是布赖恩的F#的解决方案我的C#版本

Here's my C# version of Brian's F# solution

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

修改不要乱扔的ObjectDisposedException如果调用Dispose两次

edit: Don't throw ObjectDisposedException if Dispose is called twice

推荐答案

老实说,我不知道如何正确的这一切是的,但如果基于到目前为止我的经验感觉还不错。这是F#代码,但希望你得到的味道感。它可以让你的新达源对象,然后你就可以调用next /完成/错误,并管理订阅和尝试断言当源或客户做坏事。

Honestly, I am not sure how 'right' all this is, but if feels pretty good based on my experience so far. It's F# code, but hopefully you get a sense of the flavor. It lets you 'new up' a source object, which you can then call Next/Completed/Error on, and it manages subscriptions and tries to Assert when the source or clients do bad things.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs



我会感兴趣什么是好还是坏在这里的任何想法;我还没有机会看所有新接收的东西从devlabs可是...

I'll be interested in any thoughts about what's good or bad here; I haven't had a chance to look at all the new Rx stuff from devlabs yet...

我自己的经验建议:


  • 这些谁订阅观测应该永远不会从订阅扔。没有什么合理的,当一个用户抛出观察到可以做。 (这是类似的事件。)最有可能的例外,只是泡到顶级捕获所有处理程序或崩溃的应用程序。

  • 来源大概应该是逻辑单线程 。我想这可能是很难写的客户端可以并发OnNext呼叫作出反应;即使每个单独的呼叫来自不同的线程,它是有帮助的,以避免并发调用。

  • 这绝对是非常有用的强制执行一些'合同'基/辅助类。

我很好奇,如果人们能够展现沿着这些路线更具体的建议。

I'm very curious if people can show more concrete advice along these lines.

这篇关于实施的IObservable&LT; T&GT;从头开始的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆