包含确认的可观察模式 [英] Pattern for Observables that includes acknowledgement

查看:50
本文介绍了包含确认的可观察模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理正在记录来自队列的数据.将队列处理成一个Observable很容易,这样我的代码中就可以有多个端点来接收队列中的信息.

I'm working on something that is recording data coming from a queue. It was easy enough to process the queue into an Observable so that I can have multiple endpoints in my code receiving the information in the queue.

此外,我可以确定信息是按顺序到达的.由于Observable可以确保这一点,因此效果也很好. 但是,棘手的一点是,我不希望观察者在完成对前一件事的处理之前就将下一件事通知给观察者.但是观察者完成的处理是异步的.

Furthermore, I can be sure that the information arrives in order. That bit works nicely as well since the Observables ensure that. But, one tricky bit is that I don't want the Observer to be notified of the next thing until it has completed processing the previous thing. But the processing done by the Observer is asynchronous.

作为一个更具体的示例,可能很容易理解.想象一下,我的队列中包含URL.我在我的代码中将它们作为Observable公开.我订阅了一个Observer,该Observer的任务是获取URL并将内容写入磁盘(这是一个人为的示例,因此不要担心这些细节).重要的是,获取和保存是异步的.我的问题是,在他们完成之前的处理之前,我不希望观察者获得来自Observable的下一个" URL.

As a more concrete example that is probably simple enough to follow. Imagine my queue contains URLs. I'm exposing those as an Observable in my code. The I subscribe an Observer whose job is to fetch the URLs and write the content to disk (this is a contrived example, so don't take issue with these specifics). The important point is that fetching and saving are async. My problem is that I don't want the observer to be given the "next" URL from the Observable until they have completed the previous processing.

但是在Observer接口上对next的调用将返回void.因此,观察者无法与已经完成异步任务的我交流.

But the call to next on the Observer interface returns void. So there is no way for the Observer to communicate back to me that has actually completed the async task.

有什么建议吗?我怀疑可能存在某种可以被编码的运算符,该运算符基本上会保留将来的值(将它们存储在内存中?),直到它以某种方式知道观察者已为之准备好为止.但是我希望按照某种既定模式已经存在类似的东西.

Any suggestions? I suspect there is probably some kind of operator that could be coded up that would basically withhold future values (queue them up in memory?) until it somehow knew the Observer was ready for it. But I was hoping something like that already existed following some established pattern.

推荐答案

您所描述的听起来像是背压".您可以在RxJS 4文档中阅读有关此内容的信息 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md .但是,这里提到的是RxJS 5中不存在的运算符.例如,看看"Controlled Observables"应该参考您所需要的内容.

What you describe sounds like "backpressure". You can read about it in RxJS 4 documentation https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md. However this is mentioning operators that don't exist in RxJS 5. For example have a look at "Controlled Observables" that should refer to what you need.

我认为您可以使用concatMap和Subject的实例来实现相同的目的

I think you could achieve the same with concatMap and an instance of Subject:

const asyncOperationEnd = new Subject();

source.concatMap(val => asyncOperationEnd
    .mapTo(void 0)
    .startWith(val)
    .take(2) // that's `val` and the `void 0` that ends this inner Observable
  )
  .filter(Boolean) // Always ignore `void 0`
  .subscribe(val => {
    // do some async operation...
    // call `asyncOperationEnd.next()` and let `concatMap` process another value
  });

从您的描述开始,实际上好像您所提到的观察者"像主题一样工作,因此,使自定义Subject类可以在任何Observable链中使用可能更有意义.

Fro your description it actually seems like the "observer" you're mentioning works like Subject so it would make maybe more sense to make a custom Subject class that you could use in any Observable chain.

这篇关于包含确认的可观察模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆