订阅未来的可观察对象 [英] Subscribing to a future observable

查看:46
本文介绍了订阅未来的可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我没有基于事件的 API(Geolocator) 我想转换为 Rx.

I have na event-based API (Geolocator) that I want to convert to Rx.

问题是某些操作要求取消订阅所有事件,我不想将负担传递给 Rx API 的用户.

The problem is that some operations require that all events are unsubscribed and I don't want to pass that burdon to the user of the Rx API.

因此,用户将订阅一些 observable,当订阅事件时,它们将发布到这些 observable.

So, the user will subscribe to a few observables and when the events are subscribed they are published to those observables.

最好的方法是什么?

我想创建一个用户订阅的主题,然后通过另一组可观察对象将事件发布给用户.

I thought of creating a subject that the users subscribe to and then have the events published to those through another set of observables.

这是最好的方法吗?如果是这样,如何?

Is this the best way? If so, how?

推荐答案

关键问题是找到一种方法,让 Observer 订阅流,同时拆除并替换底层源.让我们只关注单个事件源 - 您应该能够从中推断出来.

The key problem is to find a way to keep an Observer subscribed to a stream whilst tearing down and replacing an underlying source. Let's just focus on a single event source - you should be able to extrapolate from that.

首先,这是一个我们可以使用的示例类,它具有单个事件 SomeEvent,它遵循使用 EventHandler 委托的标准 .NET 模式.我们将使用它来创建事件源.

First of all, here is an example class we can use that has a single event SomeEvent that follows the standard .NET pattern using an EventHandler<StringEventArgs> delegate. We will use this to create sources of events.

注意,我拦截了事件添加/删除处理程序,以便向您显示 Rx 何时订阅和取消订阅事件,并为该类提供了一个名称属性,以便我们跟踪不同的实例:

Note I have intercepted the event add/remove handlers in order to show you when Rx subscribes and unsubscribes from the events, and given the class a name property to let us track different instances:

public class EventSource
{
    private string _sourceName;

    public EventSource(string sourceName)
    {
        _sourceName = sourceName;
    }

    private event EventHandler<MessageEventArgs> _someEvent;

    public event EventHandler<MessageEventArgs> SomeEvent
    {
        add
        {
            _someEvent = (EventHandler<MessageEventArgs>)
                Delegate.Combine(_someEvent, value);
            Console.WriteLine("Subscribed to SomeEvent: " + _sourceName);
        }
        remove
        {
            _someEvent = (EventHandler<MessageEventArgs>)
                Delegate.Remove(_someEvent, value);
            Console.WriteLine("Unsubscribed to SomeEvent: " + _sourceName);
        }

    }

    public void RaiseSomeEvent(string message)
    {
        var temp = _someEvent;
        if(temp != null)
            temp(this, new MessageEventArgs(message));
    }
}

public class MessageEventArgs : EventArgs
{
    public MessageEventArgs(string message)
    {
        Message = message;
    }

    public string Message { get; set; }   

    public override string ToString()
    {
        return Message;
    }
}

解决方案的关键思想 - StreamSwitcher

现在,这是解决方案的核心.我们将使用 Subject> 来创建一个流.我们可以使用 Observable.Switch() 运算符将最新的流返回给观察者.这是实现,下面是一个使用示例:

Solution Key Idea - StreamSwitcher

Now, here is the heart of the solution. We will use a Subject<IObservable<T>> to create a stream of streams. We can use the Observable.Switch() operator to return only the most recent stream to Observers. Here's the implementation, and an example of usage will follow:

public class StreamSwitcher<T> : IObservable<T>
{
    private Subject<IObservable<T>> _publisher;
    private IObservable<T> _stream;

    public StreamSwitcher()
    {
        _publisher = new Subject<IObservable<T>>();
        _stream = _publisher.Switch();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _stream.Subscribe(observer);
    }

    public void Switch(IObservable<T> newStream)
    {
        _publisher.OnNext(newStream);
    }

    public void Suspend()
    {
        _publisher.OnNext(Observable.Never<T>());
    }

    public void Stop()
    {
        _publisher.OnNext(Observable.Empty<T>());
        _publisher.OnCompleted();
    }
}

用法

有了这个类,您可以在每次想要启动事件流时使用 Switch 方法连接一个新的流 - 它只是将新的事件流发送到 Subject.

Usage

With this class you can hook up a new stream on each occasion you want to start events flowing by using the Switch method - which just sends the new event stream to the Subject.

您可以使用 Suspend 方法取消挂接事件,该方法发送一个 Observable.Never()Subject 有效地暂停事件流.

You can unhook events using the Suspend method, which sends an Observable.Never<T>() to the Subject effectively pausing the flow of events.

最后,您可以通过调用 Stop 来推送 Observable.Empty() 和OnComplete()` 主题.

Finally you can stop altogether by called to Stop to push an Observable.Empty<T>() andOnComplete()` the subject.

最好的部分是,每次SwitchSuspend 时,这种技术都会使 Rx 做正确的事情并正确取消订阅底层事件源停止.另请注意,一旦 Stopped 不再有事件发生,即使您再次Switch.

The best part is that this technique will cause Rx to do the right thing and properly unsubscribe from the underlying event sources each time you Switch, Suspend or Stop. Note also, that once Stopped no more events will flow, even if you Switch again.

这是一个示例程序:

static void Main()
{
    // create the switch to operate on
    // an event type of EventHandler<MessageEventArgs>()
    var switcher = new StreamSwitcher<EventPattern<MessageEventArgs>>();


    // You can expose switcher using Observable.AsObservable() [see MSDN]
    // to hide the implementation but here I just subscribe directly to
    // the OnNext and OnCompleted events.
    // This is how the end user gets their uninterrupted stream:
    switcher.Subscribe(
        Console.WriteLine,
        () => Console.WriteLine("Done!"));

    // Now I'll use the example event source to wire up the underlying
    // event for the first time
    var source = new EventSource("A");
    var sourceObservable = Observable.FromEventPattern<MessageEventArgs>(
        h => source.SomeEvent += h,
        h => source.SomeEvent -= h);


    // And we expose it to our observer with a call to Switch
    Console.WriteLine("Subscribing");
    switcher.Switch(sourceObservable);

    // Raise some events
    source.RaiseSomeEvent("1");
    source.RaiseSomeEvent("2");

    // When we call Suspend, the underlying event is unwired
    switcher.Suspend();
    Console.WriteLine("Unsubscribed");

    // Just to prove it, this is not received by the observer
    source.RaiseSomeEvent("3");

    // Now pretend we want to start events again
    // Just for kicks, we'll use an entirely new source of events
    // ... but we don't have to, you could just call Switch(sourceObservable)
    // with the previous instance.
    source = new EventSource("B");
    sourceObservable = Observable.FromEventPattern<MessageEventArgs>(
        h => source.SomeEvent += h,
        h => source.SomeEvent -= h);

    // Switch to the new event stream
    Console.WriteLine("Subscribing");
    switcher.Switch(sourceObservable);

    // Prove it works
    source.RaiseSomeEvent("3");
    source.RaiseSomeEvent("4");

    // Finally unsubscribe
    switcher.Stop();
}

这给出了这样的输出:

Subscribing
Subscribed to SomeEvent: A
1
2
Unsubscribed to SomeEvent: A
Unsubscribed
Subscribing
Subscribed to SomeEvent: B
3
4
Unsubscribed to SomeEvent: B
Done!

请注意,最终用户何时订阅并不重要 - 我是预先订阅的,但他们可以随时订阅,届时他们将开始获取事件.

Note it doesn't matter when the end user subscribes - I did it up front, but they can Subscribe any time and they'll start getting events at that point.

希望有帮助!当然,您需要将 Geolocator API 的各种事件类型整合到一个方便的包装器中 - 但这应该使您能够到达那里.

Hope that helps! Of course you'll need to pull together the various event types of the Geolocator API into a single convenient wrapper - but this should enable you to get there.

如果您想使用这种技术将多个事件组合成单个流,请查看诸如 Merge 之类的操作符,它要求您将源流投影到一个公共类型中,并使用 可能选择,或者CombineLatest之类的东西——这部分问题不应该太棘手.

If you have several events you want to combine into a single stream using this technique, look at operators like Merge, which requires you to project the source streams into a common type, with Select maybe, or something like CombineLatest - this part of the problem shouldn't be too tricky.

这篇关于订阅未来的可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆