使用IObservable而不是事件 [英] Use of IObservable instead of events

查看:168
本文介绍了使用IObservable而不是事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近读过有关IObservable的内容。到目前为止,我已经看了各种各样的问题,并观看了一个关于他们能做什么的视频。我正在想的整个推机制是辉煌的,但我仍然在努力弄清楚一切都是什么。从我的读数来看,我猜想一个 IObservable 是可以被观看的东西,而 IObservers 是'watchers'。



所以现在我在我的应用程序中尝试实现这个。在开始之前,我有一些事情想要解决。我看到IObservable与IEnumerable相反,但是,我看不到我特定实例中的任何地方可以并入我的应用程序。



目前我大量使用事件,很多,我可以看到管道开始变得难以控制。我想,IObservable可以帮助我在这里。



考虑以下设计,这是我的应用程序内的I / O的包装(FYI,我通常有处理字符串):



我有一个基本界面,名为 IDataIO

  public interface IDataIO 
{
事件OnDataReceived;
事件OnTimeout:
事件OnTransmit;
}

现在,我目前有三个实现这个界面的类,每个这些类以某种方式利用Async方法调用,引入某种类型的多线程处理:

  public class SerialIO:IDataIO; 
public class UdpIO:IDataIO;
public class TcpIO:IDataIO;

这些类中的每个类都包含在我的最终类中,称为IO(其中还实现IDataIO - 遵守我的策略模式):

  public class IO:IDataIO 
{
public SerialIO Serial;
public UdpIO Udp;
public TcpIO Tcp;
}

我已经利用策略模式来封装这三个类,以便在更改在运行时不同的 IDataIO 实例之间,使最终用户不可见。你可以想象,这在后台导致了相当多的事件管道。



那么,我怎么可以在这里利用推通知?而不是订阅事件(DataReceived等),我想简单地将数据推送给任何有兴趣的人。我有点不确定在哪里开始。我仍然在尝试玩主题的想法/泛型类,以及这种(ReplaySubject / AsynSubject / BehaviourSubject)的各种形式。有人可以告诉我这个(可能参考我的设计)吗?或者这根本不适合 IObservable



PS。请自行纠正任何我的误会:)

解决方案

可观察对于表示数据流很有用,因此您的 DataReceived 事件可以很好地模拟可观察的模式,像 IObservable< byte> IObservable< byte []> 。您还可以获得 OnError OnComplete 的附加优点,这些方便。



在实现它方面,很难说您的确切情况,但是我们经常使用 Subject< T> 作为底层来源,并调用 OnNext 来推送数据。也许像

  //使用主题可能是将数据推送到可观察
//最简单的方法//它包装IObservable和IObserver,所以你几乎永远不会直接使用IObserver
private readonly主题< byte> subject = new Subject< byte>();

private void OnPort_DataReceived(object sender,EventArgs e)
{
//将数据推送到IObserver,这可能只是一个包装器
// around您的订阅代理是使用Rx扩展名
this.subject.OnNext(port.Data); //伪代码
}

然后,您可以通过属性显示主题: p>

  public IObservable< byte> DataObservable 
{
get {return this.subject; } //或this.subject.AsObservable();
}

您可以替换您的 DataReceived 事件在 IDataIO IObservable< T> ,并让每个策略类以他们需要的方式处理他们的数据然后推送到 Subject< T>



另一方面,谁订阅了Observable能够像事件一样处理它(只需使用 Action< byte []> ),或者您可以使用<$ c $执行一些非常有用的工作c>选择,其中缓冲区等。

 私人IDataIO dataIo = new ... 

private void SubscribeToData()
{
dataIo.DataObservable .Buffer(16).Subscribe(On16Bytes);
}

private void On16Bytes(IList< byte> bytes)
{
// do stuff
}

ReplaySubject / ConnectableObservable 很好,当你知道你的订阅者将迟到聚会,但仍然需要赶上所有的事件。源缓存其所有的东西,并为每个用户重播所有内容。只有你可以说这是否是你实际需要的行为(但要小心,因为它会缓存所有增加内存使用量的内容)。



当我学习Rx我找到了关于Rx的 http://leecampbell.blogspot.co.uk/ 博客系列非常有信息地了解理论(现在的帖子有点过时了,API已经改变了,所以请注意)


I've recently been reading about IObservable. So far, i've looked at various SO questions, and watched a video on what they can do. The whole "push" mechanism I'm thinking is brilliant, but I'm still trying to figure out what exactly everything does. From my readings, I guess in a way an IObservable is something that can be 'watched', and IObservers are the 'watchers'.

So now I'm off to try and implement this in my application. There are a few things I would like to nut out before I get started. I've seen that IObservable is the opposite of IEnumerable, however, I can't really see any places in my particular instance that I can incorporate into my app.

Currently, I make heavy use of events, so much that I can see the 'plumbing' is starting to get unmanageable. I would think, that IObservable can help me out here.

Consider the following design, which is my wrapper around my I/O within my application (FYI, I typically have to deal with strings):

I have a base interface called IDataIO:

public interface IDataIO
{
  event OnDataReceived;
  event OnTimeout:
  event OnTransmit;
}

Now, I currently have three classes that implement this interface, each of these classes in some way utilize Async method calls, introducing some type of multithreaded processing:

public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;

There is a single instance of each of these classes wrapped up into my final class, called IO (which also implements IDataIO - adhering to my strategy pattern):

public class IO : IDataIO
{
  public SerialIO Serial;
  public UdpIO Udp;
  public TcpIO Tcp;
}

I have utilized the strategy pattern to encapsulate these three classes, so that when changing between the different IDataIO instances at runtime makes it 'invisible' to the end user. As you could imagine, this has led to quite a bit of 'event plumbing' in the background.

So, how can I utilize 'push' notification here in my case? Instead of subscribing to events (DataReceived etc) I would like to simply push the data to anyone that's interested. I'm a bit unsure of where to get started. I'm still trying to toy with the ideas/generic classes of Subject, and the various incarnations of this (ReplaySubject/AsynSubject/BehaviourSubject). Could someone please enlighten me on this one (maybe with reference to my design)? Or is this simply not an ideal fit for IObservable?

PS. Feel free to correct any of my 'misunderstandings' :)

解决方案

Observables are great for representing streams of data, so your DataReceived event would model nicely to the observable pattern, something like IObservable<byte> or IObservable<byte[]>. You also get the added benefit of OnError and OnComplete which are handy.

In terms of implementing it, it's hard to say for your exact scenario but we often use Subject<T> as the underlying source and call OnNext to push data. Maybe something like

// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();

private void OnPort_DataReceived(object sender, EventArgs e)
{
    // This pushes the data to the IObserver, which is probably just a wrapper
    // around your subscribe delegate is you're using the Rx extensions
    this.subject.OnNext(port.Data); // pseudo code 
}

You can then expose the subject through a property:

public IObservable<byte> DataObservable
{
    get { return this.subject; } // Or this.subject.AsObservable();
}

You can replace your DataReceived event on IDataIO with an IObservable<T> and have each strategy class handle their data in whichever manner they need and push off to the Subject<T>.

On the other side, whoever subscribes to the Observable is then able to either handle it like an event (just by using an Action<byte[]>) or you can perform some really useful work on the stream with Select, Where, Buffer, etc.

private IDataIO dataIo = new ...

private void SubscribeToData()
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}

private void On16Bytes(IList<byte> bytes)
{
    // do stuff
}

ReplaySubject/ConnectableObservables are great when you know your subscriber is going to be arriving late to the party but still needs to catch up on all of the events. The source caches everything it's pushed and replays everything for each subscriber. Only you can say whether that's the behaviour you actually need (but be careful because it will cache everything which is going increase your memory usage obviously).

When I was learning about Rx I found http://leecampbell.blogspot.co.uk/ blog series on Rx to be very informative to understand the theory (the posts are a little dated now and the APIs have changed so watch out for that)

这篇关于使用IObservable而不是事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆