从单独的Action< t>创建observable [英] Create observable from seperate Action<t>
问题描述
我正在尝试从pubnub订阅中创建一个observable。
使用pubnub,您可以订阅实时流,您必须提供三个回调:一个用于新收到的数据,一个用于发生错误
,一个用于连接状态更改。我有以下代码(writelines只是为了编译):
var pubnub = new Pubnub(" demo"," xxxxxxxxxxxxxxxxxxxxxxxxxxxx" );
行动< dynamic> userCallback = o => Console.Out.WriteLine(" Data received:{0}",o);
行动< dynamic> connectCallback = o => Console.Out.WriteLine(" Connection changed:{0}",o);
行动< dynamic> errorCallback = o => Console.Out.WriteLine(" Error:{0}",o);
pubnub.Subscribe(" 057bdc6b-9f9c-44e4-bc1a-363e4443ce87",userCallback,connectCallback,errorCallback);
IObservable< dynamic> obs = .................
现在我要创建一个obtableable,它将userCallback与onNext和errorCallback绑定到onError。有人可以给我一个
指示如何做到这一点。我现在已经使用实现IObservable< T>的自定义类完成了它。但我不相信这是最佳做法?!
亲切的问候,
Arno
这是一个相当普遍的模式,Rx解决得非常好。
您希望将服务订阅包装在Observable.Create中,然后可以将各种回调转换为Observable通知:
var stream = Observable.Create< dynamic>(
obs => {
var pubnub = new Pubnub(" demo"," xxxxxxxxxxxxxxxxxxxxxxxxxxxx");
pubnub.Subscribe(" 057bdc6b-9f9c-44e4-bc1a-363e4443ce87",
o => {
Console.Out.WriteLine("收到的数据:{0}",o);
obs.O nNext(o);
},
o => Console.Out.WriteLine(" Connection changed:{0}",o),
ex => {
Console.Out.WriteLine(" Error:{0}",o);
obs.OnError(ex);
});
return()=> pubnub.Unsubscribe(...);
});这也意味着您还可以在流完成时显式取消订阅基础服务(我假设这将是必需的)。 &NBSP;
Hi,
I'm trying to create an observable from a pubnub subscription.
With pubnub you can subscribe to a real time stream and you have to supply three callbacks: One for new data that is received, one for if an error occurred and one for if the connectionstate changes. I have the following code (writelines are just to make it compile):
var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"); Action<dynamic> userCallback = o => Console.Out.WriteLine("Data received: {0}", o); Action<dynamic> connectCallback = o => Console.Out.WriteLine("Connection changed: {0}", o); Action<dynamic> errorCallback = o => Console.Out.WriteLine("Error: {0}", o); pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", userCallback, connectCallback, errorCallback); IObservable<dynamic> obs = .................
Now I want to create an observable that ties the userCallback to onNext and errorCallback to onError. Can someone give me an
indication how to do this. I now have done it with a custom class that implements IObservable<T> but I don’t believe that this is best practice?!
Kind regards,
Arno
This is a fairly common pattern that Rx solves particularly well.
You want to wrap your service subscription in an Observable.Create, which can then turn the various callbacks into Observable notifications:
var stream = Observable.Create<dynamic>( obs => { var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"); pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", o => { Console.Out.WriteLine("Data received: {0}", o); obs.OnNext(o); }, o => Console.Out.WriteLine("Connection changed: {0}", o), ex => { Console.Out.WriteLine("Error: {0}", o); obs.OnError(ex); }); return () => pubnub.Unsubscribe(...); });This also means you can also explicitly unsubscribe from the underlying service when your stream completes (I'm assuming this will be required).
这篇关于从单独的Action< t>创建observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!