rx.js如何链接可观察对象 [英] rx.js how to chain observables

查看:104
本文介绍了rx.js如何链接可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个可观察到的东西,它是将事件从服务器中拉出,为应用程序类型过滤事件,然后订阅事件并将其分派给一个或多个处理程序进行处理.

I have an observable that is pulling events off of a server, filtering events for the application type, then subscribing and dispatching the event to one or more handlers to handle.

然后,处理程序关闭,并对数据库进行一些异步更新,我发现可观察对象将快速启动事件,以至于更新彼此之间步调一致.我应该期望的.

The handlers then go off and do some async update to the db, and I find that the observable will crank out events so fast that the updates are stepping on each other. Which I should have expected.

所以我认为我需要我的处理程序每​​个人都使用它自己的可观察性充当队列,该队列将处理一个事件并等待确认.

So I think I need my handlers to each employ it's own observable to act as a queue which will handle one event and wait for an ack.

所以我的问题是,我如何创建一个可观察的对象,该对象可连续接收消息,并在释放下一条消息之前,一次等待一条消息,以分派一条消息.

So my question is, how can I create an observable that receives messages continuously and dispatches one message at a time waiting for an acknowledgment before releasing the next message.

此外,可观察物必须是冷的.我认为,因为我不能散发消息.

Also the observables need to be Cold. I think, as I can not loose messages.

谢谢

Raif

推荐答案

我认为运算符concatMap所做的事情与您要查找的内容很接近.您可以在SO上查看以前的答案,以说明concatMap的类似用例: RxJS对依赖任务进行排队

I think the operator concatMap does something close to what you are looking for. You can review a former answer here on SO to illustrate a similar use case for concatMap : RxJS queueing dependent tasks

这很接近,但不完全是您想要的,因为没有等待ACK信号释放下一个值.相反,concatMap使用当前已执行"的可观察到的完成信号订阅下一个.如果您的可观察对象在某处包含对数据库执行更新,则这些更新将按顺序执行.例如:

It is close but not exactly what you want as there is no waiting for an ACK signal to release the next value. Instead, concatMap use the completion signal of the currently 'executed' observable to subscribe to the next one. If your observable contains somewhere the execution of an update on a db then those updates will be executed in order. For instance:

function handler (source$) {
  // source$ is your source of events from which you generate the update calls
  return source$.concatMap(function (event){
    return updateDB(event);
  })
}

function updateDB(event) {
  return Rx.Observable.create(function(observer){
    // do the update in the db
    // you probably have a success and error handler 
    // you plug the observer notification into those handlers
    if (success) {
      // if you need to pass down some value from the update
      observer.onNext(someValue);
      // In any case, signal completion to allow concatMap to move to next update
      observer.onCompleted();
    }
    if (error) {observer.onError(error);}
  })
}

这是专用于您正在使用的库的通用代码.您可能可以直接使用运算符fromNodeCallbackfromCallback,具体取决于数据库更新功能的API.

This is a generic code to specialize to the library you are using. You might be able to use directly the operator fromNodeCallback, or fromCallback depending on the API of your database update function.

同样,请注意,在执行当前的可观察对象时,可能涉及一些缓冲来保持下一个可观察对象,并且该缓冲只能是有限的,因此如果您确实在生产者之间的速度上存在显着差异和使用者或内存限制,您可能希望以不同的方式处理事情.

All the same, be mindful that there might be some buffering involved to hold on to the next observable while the current one is being executed, and that buffer can only be finite, so if you do have significant differences in speed between producer and consumer, or memory limitation, you might want to handle things differently.

此外,如果您使用的是RxJS v5,则onError变为erroronComplete变为completeonNext变为next(请参见

Also, if you are using RxJS v5, onError becomes error, onComplete becomes complete, onNext becomes next (cf. new observer interface).

最后,流的有损/无损性质与流的冷与热性质不同.您可以查看插图订阅和两种类型的流的数据流.

Last comment, the lossy/lossless nature of your stream is a concept different to the hot vs. cold nature of the stream. You can have a look at illustrated subscription and data flows for both type of streams.

这篇关于rx.js如何链接可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆