RXJS 控制 observable 调用 [英] RXJS control observable invocation

查看:20
本文介绍了RXJS 控制 observable 调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的 Angular 2 项目中使用 RxJs 版本 5.我想创建一些 observables 但我不希望立即调用 observables.

I use RxJs version 5 within my Angular 2 project. I want to create some observables but I don't want the observables being invoked immediately.

第4版中,您可以使用(例如)Controlled 命令或 可暂停缓冲区.但该功能不是(还没有) 在版本 5 中可用.

In version 4 you could control the invocation with (for example) the Controlled command or Pausable Buffers. But that functionality is not (yet) available in version 5.

我怎样才能在 RxJs 5 中获得这种功能?

How can I get the this kind of functionality in RxJs 5?

我的最终目标是将创建的 observable 排入队列并一一调用它们.下一个只有在前一个处理成功时才会调用.当一个失败时,队列被清空.

My ultimate goal is to queue the created observables and invoke them one by one. The next one is only invoked when the previous one is processed successfully. When one fails, the queue is emptied.

编辑

根据@Niklas Fasching 的评论,我可以使用 发布 操作.

With the the comment of @Niklas Fasching I could create a working solution with the Publish operation.

JS Bin

// Queue to queue operations
const queue = [];

// Just a function to create Observers
function createObserver(id): Observer {
    return {
        next: function (x) {
            console.log('Next: ' + id + x);
        },
        error: function (err) {
            console.log('Error: ' + err);
        },
        complete: function () {
            console.log('Completed');
        }
    };
};

// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {

  console.log('add ' + name);
  // Create an async operation
  var observable = Rx.Observable.create(observer => {
    // Some async operation
    setTimeout(() => 
               observer.next(' Done'), 
               500);
  });
  // Hold the operation
  var published = observable.publish();
  // Add Global subscribe
  published.subscribe(createObserver('Global'));
  // Add it to the queue
  queue.push(published);
  // Return the published so the caller could add a subscribe
  return published;
};

// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));

// Dequeue and run the first
queue.shift().connect();

推荐答案

您可以通过 发布 observable.发布的 observable 只会在调用 后启动连接.

You can seperate the start of the observable from subscription to it by publishing the observable. The published observable will only be started after calling connect on it.

请注意,所有订阅者都将共享一个对可观察序列的订阅.

Note that all subscribers will share a single subscription to the observable sequence.

var published = Observable.of(42).publish();
// subscription does not start the observable sequence
published.subscribe(value => console.log('received: ', value));
// connect starts the sequence; subscribers will now receive values
published.connect();

这篇关于RXJS 控制 observable 调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆