使用IObservable而不是事件 [英] Use of IObservable instead of events
问题描述
IObservable
是可以被观看的东西,而 IObservers
是'watchers'。 所以现在我在我的应用程序中尝试实现这个。在开始之前,我有一些事情想要解决。我看到IObservable与IEnumerable相反,但是,我看不到我特定实例中的任何地方可以并入我的应用程序。
目前我大量使用事件,很多,我可以看到管道开始变得难以控制。我想,IObservable可以帮助我在这里。
考虑以下设计,这是我的应用程序内的I / O的包装(FYI,我通常有处理字符串):
我有一个基本界面,名为 IDataIO
:
public interface IDataIO
{
事件OnDataReceived;
事件OnTimeout:
事件OnTransmit;
}
现在,我目前有三个实现这个界面的类,每个这些类以某种方式利用Async方法调用,引入某种类型的多线程处理:
public class SerialIO:IDataIO;
public class UdpIO:IDataIO;
public class TcpIO:IDataIO;
这些类中的每个类都包含在我的最终类中,称为IO(其中还实现IDataIO - 遵守我的策略模式):
public class IO:IDataIO
{
public SerialIO Serial;
public UdpIO Udp;
public TcpIO Tcp;
}
我已经利用策略模式来封装这三个类,以便在更改在运行时不同的 IDataIO
实例之间,使最终用户不可见。你可以想象,这在后台导致了相当多的事件管道。
那么,我怎么可以在这里利用推通知?而不是订阅事件(DataReceived等),我想简单地将数据推送给任何有兴趣的人。我有点不确定在哪里开始。我仍然在尝试玩主题的想法/泛型类,以及这种(ReplaySubject / AsynSubject / BehaviourSubject)的各种形式。有人可以告诉我这个(可能参考我的设计)吗?或者这根本不适合 IObservable
?
PS。请自行纠正任何我的误会:)
可观察对于表示数据流很有用,因此您的 DataReceived
事件可以很好地模拟可观察的模式,像 IObservable< byte>
或 IObservable< byte []>
。您还可以获得 OnError
和 OnComplete
的附加优点,这些方便。
在实现它方面,很难说您的确切情况,但是我们经常使用 Subject< T>
作为底层来源,并调用 OnNext
来推送数据。也许像
//使用主题可能是将数据推送到可观察
//最简单的方法//它包装IObservable和IObserver,所以你几乎永远不会直接使用IObserver
private readonly主题< byte> subject = new Subject< byte>();
private void OnPort_DataReceived(object sender,EventArgs e)
{
//将数据推送到IObserver,这可能只是一个包装器
// around您的订阅代理是使用Rx扩展名
this.subject.OnNext(port.Data); //伪代码
}
然后,您可以通过属性显示主题: p>
public IObservable< byte> DataObservable
{
get {return this.subject; } //或this.subject.AsObservable();
}
您可以替换您的 DataReceived
事件在 IDataIO
与 IObservable< T>
,并让每个策略类以他们需要的方式处理他们的数据然后推送到 Subject< T>
。
另一方面,谁订阅了Observable能够像事件一样处理它(只需使用 Action< byte []>
),或者您可以使用<$ c $执行一些非常有用的工作c>选择,其中
,缓冲区
等。
私人IDataIO dataIo = new ...
private void SubscribeToData()
{
dataIo.DataObservable .Buffer(16).Subscribe(On16Bytes);
}
private void On16Bytes(IList< byte> bytes)
{
// do stuff
}
ReplaySubject
/ ConnectableObservable
很好,当你知道你的订阅者将迟到聚会,但仍然需要赶上所有的事件。源缓存其所有的东西,并为每个用户重播所有内容。只有你可以说这是否是你实际需要的行为(但要小心,因为它会缓存所有增加内存使用量的内容)。
当我学习Rx我找到了关于Rx的 http://leecampbell.blogspot.co.uk/ 博客系列非常有信息地了解理论(现在的帖子有点过时了,API已经改变了,所以请注意)
I've recently been reading about IObservable. So far, i've looked at various SO questions, and watched a video on what they can do. The whole "push" mechanism I'm thinking is brilliant, but I'm still trying to figure out what exactly everything does. From my readings, I guess in a way an IObservable
is something that can be 'watched', and IObservers
are the 'watchers'.
So now I'm off to try and implement this in my application. There are a few things I would like to nut out before I get started. I've seen that IObservable is the opposite of IEnumerable, however, I can't really see any places in my particular instance that I can incorporate into my app.
Currently, I make heavy use of events, so much that I can see the 'plumbing' is starting to get unmanageable. I would think, that IObservable can help me out here.
Consider the following design, which is my wrapper around my I/O within my application (FYI, I typically have to deal with strings):
I have a base interface called IDataIO
:
public interface IDataIO
{
event OnDataReceived;
event OnTimeout:
event OnTransmit;
}
Now, I currently have three classes that implement this interface, each of these classes in some way utilize Async method calls, introducing some type of multithreaded processing:
public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;
There is a single instance of each of these classes wrapped up into my final class, called IO (which also implements IDataIO - adhering to my strategy pattern):
public class IO : IDataIO
{
public SerialIO Serial;
public UdpIO Udp;
public TcpIO Tcp;
}
I have utilized the strategy pattern to encapsulate these three classes, so that when changing between the different IDataIO
instances at runtime makes it 'invisible' to the end user. As you could imagine, this has led to quite a bit of 'event plumbing' in the background.
So, how can I utilize 'push' notification here in my case? Instead of subscribing to events (DataReceived etc) I would like to simply push the data to anyone that's interested. I'm a bit unsure of where to get started. I'm still trying to toy with the ideas/generic classes of Subject
, and the various incarnations of this (ReplaySubject/AsynSubject/BehaviourSubject). Could someone please enlighten me on this one (maybe with reference to my design)? Or is this simply not an ideal fit for IObservable
?
PS. Feel free to correct any of my 'misunderstandings' :)
Observables are great for representing streams of data, so your DataReceived
event would model nicely to the observable pattern, something like IObservable<byte>
or IObservable<byte[]>
. You also get the added benefit of OnError
and OnComplete
which are handy.
In terms of implementing it, it's hard to say for your exact scenario but we often use Subject<T>
as the underlying source and call OnNext
to push data. Maybe something like
// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();
private void OnPort_DataReceived(object sender, EventArgs e)
{
// This pushes the data to the IObserver, which is probably just a wrapper
// around your subscribe delegate is you're using the Rx extensions
this.subject.OnNext(port.Data); // pseudo code
}
You can then expose the subject through a property:
public IObservable<byte> DataObservable
{
get { return this.subject; } // Or this.subject.AsObservable();
}
You can replace your DataReceived
event on IDataIO
with an IObservable<T>
and have each strategy class handle their data in whichever manner they need and push off to the Subject<T>
.
On the other side, whoever subscribes to the Observable is then able to either handle it like an event (just by using an Action<byte[]>
) or you can perform some really useful work on the stream with Select
, Where
, Buffer
, etc.
private IDataIO dataIo = new ...
private void SubscribeToData()
{
dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}
private void On16Bytes(IList<byte> bytes)
{
// do stuff
}
ReplaySubject
/ConnectableObservable
s are great when you know your subscriber is going to be arriving late to the party but still needs to catch up on all of the events. The source caches everything it's pushed and replays everything for each subscriber. Only you can say whether that's the behaviour you actually need (but be careful because it will cache everything which is going increase your memory usage obviously).
When I was learning about Rx I found http://leecampbell.blogspot.co.uk/ blog series on Rx to be very informative to understand the theory (the posts are a little dated now and the APIs have changed so watch out for that)
这篇关于使用IObservable而不是事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!