RxJs:条件为真时缓冲事件,条件为假时传递事件 [英] RxJs: buffer events when condition is true, pass events through when condition is false

查看:37
本文介绍了RxJs:条件为真时缓冲事件,条件为假时传递事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面创建了 Observable 构造函数,其工作原理如所述.有谁知道使用 RxJs 附带的运算符是否有更简洁的方法来实现相同的行为?我在看 bufferToggle 接近所需的行为,但我需要在缓冲区关闭时传递发出的值.

I created the Observable constructor below that works as described. Does anyone know if there is a more concise way of achieving the same behaviour using the operators that come with RxJs? I was looking at bufferToggle which is close to the required behaviour, but I need the emitted values to be passed through when the buffer is closed.

函数描述:如果condition发出true,则缓冲发出的source值,并通过发出的sourcesource 值如果 condition 发出 false.如果条件在 true 后发出 false,缓冲区会按照接收到的顺序释放每个值.缓冲区被初始化为传递发出的 source 值,直到 condition 发出 true.

Function Description: Buffers the emitted source values if the condition emits true, and passes through the emitted source values if the condition emits false. If the condition emits false after being true, the buffer releases each value in the order that they were received. The buffer is initialised to pass through the emitted source values until the condition emits true.

function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
  return new Observable<T>(subscriber => {
    const subscriptions: Subscription[] = [];
    const buffer = [];
    let isBufferOpen = false;

    subscriptions.push(
      // handle source events
      source.subscribe(value => {
        // if buffer is open, or closed but buffer is still being 
        // emptied from previously being closed.
        if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
          buffer.push(value);

        } else {
          subscriber.next(value);
        }
      }),

      // handle condition events
      condition.do(value => isBufferOpen = value)
        .filter(value => !value)
        .subscribe(value => {
          while (buffer.length > 0 && !isBufferOpen) {
            subscriber.next(buffer.shift());
          }
        })
    );

    // on unsubscribe
    return () => {
      subscriptions.forEach(sub => sub.unsubscribe());
    };
  });
}

编辑

为了回应评论,以下与上面的功能相同,但采用 RxJs 运算符的形式,并更新为使用 RxJx 6+ pipeabale 运算符:

Edit

In response to comment, the following is the same function as the one above but in the form of an RxJs Operator and updated to use RxJx 6+ pipeabale Operators:

 function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
   return (source: Observable<T>) => {
     return new Observable<T>(subscriber => {
       const subscriptions: Subscription[] = [];
       const buffer: T[] = [];
       let isBufferOpen = false;
   
       subscriptions.push(
         // handle source events
         source.subscribe(value => {
           // if buffer is open, or closed but buffer is still being 
           // emptied from previously being closed.
           if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
             buffer.push(value);
           } else {
             subscriber.next(value);
           }
         }),
   
         // handle condition events
         condition.pipe(
          tap(con => isBufferOpen = con),
          filter(() => !isBufferOpen)
         ).subscribe(() => {
          while (buffer.length > 0 && !isBufferOpen) {
            subscriber.next(buffer.shift());
          }
        })
       );
   
       // on unsubscribe
       return () => subscriptions.forEach(sub => sub.unsubscribe());
     });
   }
}

推荐答案

我找到了一种基于运算符而不是订阅的解决方案,但不愿称其为更简洁.

I've found a solution based on operators rather than subscriptions, but hesitate to call it more concise.

请注意,如果可以保证缓冲区开/关流始终以关闭(即奇数次发射)结束,则可以删除 endToken.

Note, the endToken can be removed if it's possible to guarantee the buffer on/off stream always ends with an off (i.e an odd number of emits).

console.clear() 
const Observable = Rx.Observable

// Source and buffering observables
const source$ = Observable.timer(0, 200).take(15)
const bufferIt$ = Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)  

// Function to switch buffering
const endToken = 'end'            
const bufferScanner = { buffering: false, value: null, buffer: [] }
const bufferSwitch = (scanner, [src, buffering]) => { 
  const onBufferClose = (scanner.buffering && !buffering) || (src === endToken)
  const buffer = (buffering || onBufferClose) ? scanner.buffer.concat(src) : []
  const value = onBufferClose ? buffer : buffering ? null : [src]
  return { buffering, value, buffer }
}
      
// Operator chain
const output = 
  source$
    .concat(Observable.of(endToken))     // signal last buffer to emit
    .withLatestFrom(bufferIt$)           // add buffering flag to stream
    .scan(bufferSwitch, bufferScanner)   // turn buffering on and off
    .map(x => x.value)                   // deconsruct bufferScanner
    .filter(x => x)                      // ignore null values
    .mergeAll()                          // deconstruct buffer array
    .filter(x => x !== endToken)         // ignore endToken

// Proof
const start = new Date()
const outputDisplay = output.timestamp()
  .map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
  .map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
  .subscribe(console.log)
  

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

脚注

我也找到了一个基于 buffer() 的解决方案,但我不相信它在高频源下是稳定的.某些缓冲区配置似乎有些问题(即声明看起来很合理,但测试显示偶尔会出现干扰缓冲区操作的延迟).

I also found a solution based on buffer(), but am not convinced it's stable with a high-frequency source. There seems to be something hokey with certain buffer configurations (i.e the declaration looks sound but tests show occasional delays which interfere with buffer operation).

无论如何,供参考,

/* 
  Alternate with buffered and unbuffered streams
*/

const buffered = 
   source$.withLatestFrom(bufferIt$)      
    .filter(([x, bufferIsOn]) => bufferIsOn)  
    .map(x => x[0])
    .buffer(bufferIt$.filter(x => !x))
    .filter(x => x.length)       // filter out empty buffers
    .mergeAll()                  // unwind the buffer

const unbuffered =
  source$.withLatestFrom(bufferIt$)      
    .filter(([x, bufferIsOn]) => !bufferIsOn)    
    .map(x => x[0])

const output = buffered.merge(unbuffered)

这篇关于RxJs:条件为真时缓冲事件,条件为假时传递事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆