从单独的Action< t>创建observable [英] Create observable from seperate Action<t>

查看:52
本文介绍了从单独的Action< t>创建observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



我正在尝试从pubnub订阅中创建一个observable。


使用pubnub,您可以订阅实时流,您必须提供三个回调:一个用于新收到的数据,一个用于发生错误
,一个用于连接状态更改。我有以下代码(writelines只是为了编译):

 var pubnub = new Pubnub(" demo"," xxxxxxxxxxxxxxxxxxxxxxxxxxxx" ); 
行动< dynamic> userCallback = o => Console.Out.WriteLine(" Data received:{0}",o);
行动< dynamic> connectCallback = o => Console.Out.WriteLine(" Connection changed:{0}",o);
行动< dynamic> errorCallback = o => Console.Out.WriteLine(" Error:{0}",o);
pubnub.Subscribe(" 057bdc6b-9f9c-44e4-bc1a-363​​e4443ce87",userCallback,connectCallback,errorCallback);

IObservable< dynamic> obs = .................





现在我要创建一个obtableable,它将userCallback与onNext和errorCallback绑定到onError。有人可以给我一个
指示如何做到这一点。我现在已经使用实现IObservable< T>的自定义类完成了它。但我不相信这是最佳做法?!



亲切的问候,


Arno

解决方案

这是一个相当普遍的模式,Rx解决得非常好。


您希望将服务订阅包装在Observable.Create中,然后可以将各种回调转换为Observable通知:

 var stream = Observable.Create< dynamic>(
obs => {

var pubnub = new Pubnub(" demo"," xxxxxxxxxxxxxxxxxxxxxxxxxxxx");

pubnub.Subscribe(" 057bdc6b-9f9c-44e4-bc1a-363​​e4443ce87",
o => {
Console.Out.WriteLine("收到的数据:{0}",o);
obs.O nNext(o);
},
o => Console.Out.WriteLine(" Connection changed:{0}",o),
ex => {
Console.Out.WriteLine(" Error:{0}",o);
obs.OnError(ex);
});

return()=> pubnub.Unsubscribe(...);
});

这也意味着您还可以在流完成时显式取消订阅基础服务(我假设这将是必需的)。 &NBSP;


Hi,

I'm trying to create an observable from a pubnub subscription.

With pubnub you can subscribe to a real time stream and you have to supply three callbacks: One for new data that is received, one for if an error occurred and one for if the connectionstate changes. I have the following code (writelines are just to make it compile):

            var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx");
            Action<dynamic> userCallback = o => Console.Out.WriteLine("Data received: {0}", o);
            Action<dynamic> connectCallback = o => Console.Out.WriteLine("Connection changed: {0}", o);
            Action<dynamic> errorCallback = o => Console.Out.WriteLine("Error: {0}", o);
            pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", userCallback, connectCallback, errorCallback);   
            
            IObservable<dynamic> obs = .................


Now I want to create an observable that ties the userCallback to onNext and errorCallback to onError. Can someone give me an indication how to do this. I now have done it with a custom class that implements IObservable<T> but I don’t believe that this is best practice?!

Kind regards,

Arno

解决方案

This is a fairly common pattern that Rx solves particularly well.

You want to wrap your service subscription in an Observable.Create, which can then turn the various callbacks into Observable notifications:

var stream = Observable.Create<dynamic>(
	obs => {
		
		var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx");

		pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", 
			o => {
				Console.Out.WriteLine("Data received: {0}", o);
				obs.OnNext(o);
			}, 
			o => Console.Out.WriteLine("Connection changed: {0}", o), 
			ex => {
				Console.Out.WriteLine("Error: {0}", o);
				obs.OnError(ex);
			});	
			
		return () => pubnub.Unsubscribe(...);
	});

This also means you can also explicitly unsubscribe from the underlying service when your stream completes (I'm assuming this will be required).  


这篇关于从单独的Action&lt; t&gt;创建observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆