RxJava:缓冲项目,直到当前项目满足某些条件为止 [英] RxJava: buffer items until some condition is true for current item

查看:45
本文介绍了RxJava:缓冲项目,直到当前项目满足某些条件为止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我要弄清楚的摘录:

Here's a snippet I'm trying to figure out:

class RaceCondition {

    Subject<Integer, Integer> subject = PublishSubject.create();

    public void entryPoint(Integer data) {
        subject.onNext(data);
    }

    public void client() {
        subject /*some operations*/
                .buffer(getClosingSelector())
                .subscribe(/*handle results*/);
    }

    private Observable<Integer> getClosingSelector() {
        return subject /* some filtering */;
    }
}

有一个 Subject 接受来自外部的事件.有一个订阅该主题的客户端,它可以处理事件并缓冲它们.这里的主要思想是,应根据使用流中的项目计算出的某种条件,每次都发出缓冲的项目.

There's a Subject that accepts events from the outside. There's a client subscribed to this subject that processes the events and also buffers them. The main idea here is that the buffered items should be emitted each time based on some condition calculated using an item from the stream.

为此,缓冲区边界本身会侦听主体.

For this purpose the buffer boundary itself listens to the subject.

一个重要的期望行为:每当边界发出该项时,也应将其包括在以下 buffer 的发出中.当前配置不是这种情况,因为该项目(至少是我认为的)是从关闭选择器发出的,直到到达 buffer ,因此不包括在内在当前的排放中,但是被留下来等待下一个排放.

An important desired behaviour: whenever the boundary emits the item, it should also be included in the following emission of the buffer. It's not the case with the current configuration as the item (at least that's what I think) is emitted from the closing selector before it reaches the buffer, so that it's not included in the current emission but is left behind waiting for the next one.

是否有一种方法可以使关闭选择器首先等待该项目被缓冲?如果没有,是否还有另一种方法可以基于下一个传入项目来缓冲和释放项目?

Is there a way to essentially make closing selector wait for the item to be buffered first? If not, is there another way to buffer and release items based on the next incoming item?

推荐答案

如果我理解正确,则需要缓冲,直到某些谓词允许基于项为止.您可以使用一组复杂的运算符来执行此操作,但编写一个自定义运算符可能会更容易:

If I understand correctly, you want to buffer until some predicate allows it based on the items. You can do this with a complicated set of operators but perhaps its easier to just write a custom operator:

public final class BufferUntil<T> 
implements Operator<List<T>, T>{

    final Func1<T, Boolean> boundaryPredicate;

    public BufferUntil(Func1<T, Boolean> boundaryPredicate) {
        this.boundaryPredicate = boundaryPredicate;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super List<T>> child) {
        BufferWhileSubscriber parent = 
                new BufferWhileSubscriber(child);
        child.add(parent);
        return parent;
    }

    final class BufferWhileSubscriber extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;

        List<T> buffer = new ArrayList<>();

        /**
         * @param actual
         */
        public BufferWhileSubscriber(
                Subscriber<? super List<T>> actual) {
            this.actual = actual;
        }

        @Override
        public void onNext(T t) {
            buffer.add(t);
            if (boundaryPredicate.call(t)) {
                actual.onNext(buffer);
                buffer = new ArrayList<>();
            }
        }

        @Override
        public void onError(Throwable e) {
            buffer = null;
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            List<T> b = buffer;
            buffer = null;
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }
    }
}

这篇关于RxJava:缓冲项目,直到当前项目满足某些条件为止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆