根据特定条件在 RxJs 中加入两个 observables 流 [英] joining two streams of observables in RxJs according to specific conditions

查看:45
本文介绍了根据特定条件在 RxJs 中加入两个 observables 流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个对象流,帐户和余额.

我需要根据idaccount_id合并(加入)两个流

var accounts = Rx.Observable.from([{ id: 1, name: 'account 1' },{ id: 2, name: 'account 2' },{ id: 3, name: 'account 3' },]);var balances = Rx.Observable.from([{ account_id: 1, 余额: 100 },{ account_id: 2, 余额: 200 },{ account_id: 3, 余额: 300 },]);

预期:

var 结果 = [{ id: 1, name: 'account 1', balance: 100},{ id: 2, name: 'account 2', balance: 200},{ id: 3, name: 'account 3', balance: 300},];

这对 RxJs 可行吗?

明确地说,我知道如何使用普通的 js/lodash 或类似的东西来做到这一点.就我而言,我从 Angular Http 模块获取这些流,所以我问在这种情况下我是否可以从 RxJs 中受益

解决方案

根据您的评论之一,您的示例是模拟来自 Angular Http 调用的流.

所以而不是:

var accounts = Rx.Observable.from([{ id: 1, name: 'account 1' },{ id: 2, name: 'account 2' },{ id: 3, name: 'account 3' },]);var balances = Rx.Observable.from([{ account_id: 1, 余额: 100 },{ account_id: 2, 余额: 200 },{ account_id: 3, 余额: 300 },]);

我宁愿说它是:

var accounts = Rx.Observable.of([{ id: 1, name: 'account 1' },{ id: 2, name: 'account 2' },{ id: 3, name: 'account 3' },]);var balances = Rx.Observable.of([{ account_id: 1, 余额: 100 },{ account_id: 2, 余额: 200 },{ account_id: 3, 余额: 300 },]);

为什么: from 会一一发出每一项,of 会发出整个数组,我猜你的 http 响应是整个数组.

也就是说,您可能想要实现的是:

const { Observable } = Rx;//模拟 HTTP 请求const account$ = Rx.Observable.of([{ id: 1, name: 'account 1' },{ id: 2, name: 'account 2' },{ id: 3, name: '账户 3' }]);const 余额$ = Rx.Observable.of([{ account_id: 1, 余额: 100 },{ account_id: 2, 余额: 200 },{ account_id: 3, 余额: 300 }]);//实用程序const joinArrays = (accounts, balances) =>帐户.map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance }));const findBalanceByAccountId = (余额, id) =>balances.find(balance => balance.account_id === id) ||{余额:0};const print = (obj) =>JSON.stringify(obj, null, 2)//使用 forkJoin 同时启动两个 observables,而不是在每个请求之间等待可观察的.forkJoin(accounts$, balances$).map(([accounts, balances]) => joinArrays(accounts, balances)).do(rslt => console.log(print(rslt))).订阅();

输出:

<预><代码>[{身份证":1,"name": "账户 1",余额":100},{身份证":2,"name": "账户 2",余额":200},{身份证":3,"name": "账户 3",余额":300}]

这是一个有效的 Plunkr:https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=预览

编辑 1:处理一个数组来组合你的结果可能不是性能的最佳主意,而不是返回一个数组,也许尝试返回一个以帐户 ID 作为键的对象.通过这种方式,您可以简单地删除 findBalanceByAccountId 函数并拥有更快的应用程序(此处仅修改代码) :

const balances$ = Rx.Observable.of({1: { account_id: 1, 余额: 100 },2: { account_id: 2, 余额: 200 },3: { account_id: 3, balance: 300 }});//实用程序const joinArrays = (accounts, balances) =>帐户.map(account => Object.assign({},帐户,{ balance: balances[account.id].balance }));

I have two streams of objects, the accounts and balances.

I need to merge (join) the two streams according to the id and account_id

var accounts = Rx.Observable.from([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.from([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

What is expected:

var results = [
    { id: 1, name: 'account 1', balance: 100},
    { id: 2, name: 'account 2', balance: 200},
    { id: 3, name: 'account 3', balance: 300},
];

Is this feasible with RxJs ?

To be clear I know how to do this with plain js/lodash or something similar. In my case I am getting these streams from Angular Http Module, so I am asking If I could get benefit of RxJs in this case

解决方案

Accoring to one of your comment, your example is to simulate a stream from an Angular Http call.

So instead of :

var accounts = Rx.Observable.from([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.from([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

I'd rather say that it is :

var accounts = Rx.Observable.of([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.of([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

Why : from will emit every item one by one, of will emit the entire array and I guess your http response is the whole array.

That said, what you probably want to achieve is :

const { Observable } = Rx;

// simulate HTTP requests
const accounts$ = Rx.Observable.of([
  { id: 1, name: 'account 1' },
  { id: 2, name: 'account 2' },
  { id: 3, name: 'account 3' }
]);

const balances$ = Rx.Observable.of([
  { account_id: 1, balance: 100 },
  { account_id: 2, balance: 200 },
  { account_id: 3, balance: 300 }
]);

// utils
const joinArrays = (accounts, balances) =>
  accounts
    .map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance }));

const findBalanceByAccountId = (balances, id) =>
  balances.find(balance => balance.account_id === id) || { balance: 0 };

const print = (obj) => JSON.stringify(obj, null, 2)

// use forkJoin to start both observables at the same time and not wait between every request
Observable
  .forkJoin(accounts$, balances$)
  .map(([accounts, balances]) => joinArrays(accounts, balances))
  .do(rslt => console.log(print(rslt)))
  .subscribe();

Output :

[
  {
    "id": 1,
    "name": "account 1",
    "balance": 100
  },
  {
    "id": 2,
    "name": "account 2",
    "balance": 200
  },
  {
    "id": 3,
    "name": "account 3",
    "balance": 300
  }
]

Here's a working Plunkr : https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview

EDIT 1 : Working on an array to compose your result is probably not the best idea for performance and instead of returning an array, maybe try to return an object which have as key, the ID of the account. This way you might simply remove the findBalanceByAccountId function and have a faster app (only modified code here) :

const balances$ = Rx.Observable.of({
  1: { account_id: 1, balance: 100 },
  2: { account_id: 2, balance: 200 },
  3: { account_id: 3, balance: 300 }
});

// utils
const joinArrays = (accounts, balances) =>
  accounts
    .map(account => Object.assign(
      {}, 
      account, 
      { balance: balances[account.id].balance }
    ));

这篇关于根据特定条件在 RxJs 中加入两个 observables 流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆