RxJs避免外部状态,但仍然访问以前的值 [英] RxJs avoid external state but still access previous values

查看:68
本文介绍了RxJs避免外部状态,但仍然访问以前的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用RxJs来监听amqp queu(不太相关)。

I'm using RxJs to listen to a amqp queu (not really relevant).

我有一个函数 createConnection 返回 Observable ,它会发出新的连接对象。一旦建立连接,我希望每隔 1000毫秒发送一次消息,然后在10条消息之后我想关闭连接。

I have a function createConnection that returns an Observable that emits the new connection object. Once I have a connection, I want to send messages through it every 1000ms and after 10 messages I want to close the connection.

我'我试图避免外部状态,但如果我不将连接存储在外部变量中,我该如何关闭它?请参阅我从连接开始,然后 flatMap 并推送消息,因此在几个链之后我不再拥有连接对象。

I'm trying to avoid external state, but if I don't store the connection in an external variable, how can I close it? See I begin with the connection, then flatMap and push messages, so after a few chains I no longer have the connection object.

这不是我的流程,但想象一下这样的事情:

This is no my flow but imagine something like this:

createConnection()
  .flatMap(connection => connection.createChannel())
  .flatMap(channel => channel.send(message))
  .do(console.log)
  .subscribe(connection => connection.close()) <--- obviously connection isn't here

现在我明白了愚蠢的做到这一点,但现在我如何访问连接?我当然可以从 var connection = createConnection()

Now I understand that it's stupid to do that, but now how do I access the connection? I could of course begin with var connection = createConnection()

开始,然后以某种方式加入。但是我该怎么做?我甚至不知道如何正确地提出这个问题。 Bottomline,我所拥有的是一个可观察的,发出连接,在连接打开后我想要一个每1000ms发出一次消息的observable(带有 take(10)),然后关闭连接

and later on somehow join that. But how do I do this? I don't even know how to ask this question properly. Bottomline, what I have is an observable, that emits a connection, after the connection is opened I want an observable that emits messages every 1000ms (with a take(10)), then close the connection

推荐答案

你问题的直接答案是你可以通过每一步。例如,您可以替换此行

The direct answer to your question is "you can carry it through each step". For example, you can replace this line

.flatMap(connection => connection.createChannel())

这一个:

.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))

并保持对连接的完全访问权限。

and retain access to the connection all the way down.

但还有另一种方法可以做你想做的事情。假设你的createConnection和createChannel函数看起来像这样:

But there's another way to do what you want to do. Let's assume your createConnection and createChannel functions look something like this:

function createConnection() {
  return Rx.Observable.create(observer => {
    console.log('creating connection');
    const connection = {
      createChannel: () => createChannel(),
      close: () => console.log('disposing connection')
    };

    observer.onNext(connection);

    return Rx.Disposable.create(() => connection.close());
  });
}

function createChannel() {
  return Rx.Observable.create(observer => {
    const channel = {
      send: x => console.log('sending message: ' + x)
    };

    observer.onNext(channel);

    // assuming no cleanup here, don't need to return disposable
  });
}

createConnection (和 createChannel ,但我们会专注于前者)返回一个冷可观察者;每个订阅者将获得包含单个连接的自己的连接流,并且当该订阅到期时,将自动调用dispose逻辑。

createConnection (and createChannel, but we'll focus on the former) returns a cold observable; each subscriber will get their own connection stream containing a single connection, and when that subscription expires, the dispose logic will be called automatically.

这允许您执行类似这样的操作:

This allows you to do something like this:

const subscription = createConnection()
  .flatMap(connection => connection.createChannel())
  .flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i })))
  .take(10)
  .subscribe(x => x.channel.send(x.data))
;

您实际上不必处理订阅以进行清理;在 take(10)满意后,整个链将完成并将触发清理。你明确需要在订阅上调用dispose的唯一原因是,如果你想在10 1000ms间隔之前拆除它们。

You don't actually have to dispose the subscription for cleanup to occur; after take(10) is satisfied, the whole chain will finish and cleanup will be triggered. The only reason you'd need to call dispose on the subscription explicitly is if you wanted to tear things down before the 10 1000ms intervals were up.

注意这个解决方案也是包含一个直接回答你问题的实例:我们将频道推到线下,这样我们就可以在传递给订阅调用的onNext lambda中使用它(通常会出现这样的代码)。

Note that this solution also contains an instance of the direct answer to your question: we cart the channel down the line so we can use it in the onNext lambda passed to the subscribe call (which is customarily where such code would appear).

以下是完整的工作: https:// jsbin .com / korihe / 3 / edit?js,console,output

这篇关于RxJs避免外部状态,但仍然访问以前的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆