如何序列化观测量到云和背部 [英] How to serialize Observables to the cloud and back

查看:186
本文介绍了如何序列化观测量到云和背部的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要拆分处理顺序(如在这个问题如何组织数据处理器序列与.NET RX )到Azure的环境的一些计算单位。结果
的想法是序列观测序列的Azure队列(或服务总线)和反序列化回来。结果
。如果生产者或消费者失败其他一方应该可以继续生产/消费。

I need to split processing sequence (like in this question How to organize sequence of data processors with .net RX) into several computation units in Azure environment.
The idea is to serialize Observable sequence to Azure Queues(or Service Bus) and to deserialize it back.
If producer or consumer is failed other party should be able to continue producing/consuming.

任何人都可以提出一个优雅的方式这样做,有什么用(Azure的队列或者服务总线)?结果
有没有人用TCP观测提供商 - http://rxx.codeplex.com / Wiki页面?标题= TCP%20Qbservable%20Provider 对于这样的问题是安全的一方的失败?

Could anyone suggest an elegant way to do so and what to use (Azure Queues or Service Bus)?
Has anyone used TCP Observable provider - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider for such problems is it safe to the failures of one of the parties?

推荐答案

假设你有一个messaage队列以下API

Imagine you have a messaage queue with the following API

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

为使这RX兼容

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

要在容错方式来使用它。注意:如果有一个例外
抛出通过的OnError上游交付重试将重新订阅

To use it in a fault tolerant way. Note Retry will resubscribe if there is an exception thrown upstream delivered by OnError

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

在写入一侧例如,你可以发送邮件每两秒钟,然后重试
的订阅的生成器,如果有一个误差。注意:有不可能
在Observable.Interval一个错误,只是生成邮件中每一个时间间隔,但
想象从一个文件或一些其他的消息队列中读取。

On the writing side for example you could send a message every two seconds and retry the subscription to the generator if there is an error. Note there is unlikely to be an error in Observable.Interval that just generates a message every time interval but imagine reading from a file or some other message queue.

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

请注意你应该使用的IObservable抓住扩展方法,而不是
盲目地重试你可能一遍又一遍的得到同样的错误。
重试()。
订阅(MQ);

Note you probably should use the IObservable Catch extension method rather than blindly retrying as you might get the same error over and over again. Retry(). Subscribe(mq);

这篇关于如何序列化观测量到云和背部的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆