使用 RxJava 实现类似旋转门的操作符 [英] Implementing a turnstile-like operator with RxJava

查看:46
本文介绍了使用 RxJava 实现类似旋转门的操作符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要帮助在 RxJava (RxScala) 中实现类似旋转门的操作符.我花了很长时间思考它,但我似乎卡住了.

I need help implementing a turnstile-like operator in RxJava (RxScala). I spent quite some time thinking about it, but I seem to be stuck.

函数的类型应该如下:

def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]

这个想法是操作员的行为应该与真正的旋转门非常相似.有人来了(queue),还有一个turnstile 准备好接受新的单身人(true 元素,你可以把它想象成一个插入到旋转门中的令牌),或者关闭(false 在旋转门中,取消之前的令牌).对于旋转门中的每个 true 元素,只有一个人可以通过.

The idea is that the behavior of the operator should be very similar to a real turnstile. There are people coming (queue), and there is a turnstile that is either ready for accepting new single person (a true element in the turnstile, you can imagine it as a token inserted into the turnstile), or closed (false in the turnstile, canceling previous token). For every true element in the turnstile, only one person may pass.

此外,在没有人通过的情况下连续插入多个令牌(旋转门中的几个 true 项目)与仅插入一个令牌相同,旋转门不计算令牌.

Furthermore, inserting several tokens in a row (several true items in a turnstile) without a person passing is the same as inserting only a single token, the turnstile doesn't count the tokens.

换句话说,旋转门最初是关闭的.当 true 元素出现在其中时,它会为一个人打开.如果一个人出现,它会通过(到输出)并且旋转门再次关闭.如果旋转门中出现 false 元素,旋转门也会关闭.

In other words, the turnstile is initially closed. When a true element appears in it, it opens up for a single person. If a person appears, it passes through (to the output) and the turnstile closes again. If a false element appears in the turnstile, the turnstile also closes.

queue       ----A---B-------------C--D--
turnstile   --T--------T--T-T-T-T------T
            ============================
output      ----A------B----------C----D

一张大理石图显示打开的旋转门等待一个人 A,然后人 B 等待旋转门打开,然后几个令牌表现为一个人 - 人 C 通过,但人 D 必须再次等待新的令牌

----A----B--
--T---T-F-T-
============
----A-----B-

显示旋转门中的 false 元素如何再次关闭旋转门的大理石图.

A marble diagram showing, how a false element in the turnstile closes the turnstile again.

感谢任何帮助.我认为在不编写自定义运算符的情况下实现这一点的唯一方法是以某种方式使用 zip 运算符,因为它可能是唯一使一个序列中的元素等待另一个序列中的元素(或者是还有其他我不知道的吗?).但是我需要压缩一些旋转门元素,这取决于它们是否与人配对...

Any help is appreciated. I think the only way to implement this without writing a custom operator would be using the zip operator somehow, because it is probably the only operator that makes elements from one sequence wait for elements from the other (or are there any others I'm not aware of?). But I need to not zip some of the turnstile elements depending on whether they got paired with a person or not...

我认为这是一个有趣的问题,我很好奇一些好的解决方案.

I think this is an interesting problem, and I'm quite curious about some nice solution to it.

推荐答案

所以我想我有一个更简洁、完全 Rx 的解决方案.这实际上是一个非常有趣的问题.如果它满足您的需求,我认为它最终非常优雅,尽管它需要很长时间才能实现.

So I think I have a cleaner, fully Rx solution. This was actually a pretty fun problem to solve. Provided it works for your needs, I think it ended up being really elegant, although it took quite awhile to arrive at it.

遗憾的是我不会 Scala,所以您将不得不处理我的 Java8 lambdas.:D

Sadly I don't know Scala, so you're going to have to deal with my Java8 lambdas. :D

整个实现:

public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
    return queue.publish(sharedQueue ->
            tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}

所以,这里发生的事情是我们使用 publish 来创建一个我们可以多次订阅的人员队列的共享观察.在其中,我们在令牌流上使用 switchMap ,这意味着每当从 switchMap 发出新的 Observable 时,它​​会丢弃最后一个并订阅新的 Observable.每当令牌为真时,它都会对人员队列进行新的订阅(连续多个真是可以的,因为它会取消旧的订阅).当它为 false 时,它​​只是转储一个空的 Observable 以免浪费时间.

So, what's happening here is we use publish to make a shared observable of the people queue that we can subscribe to multiple times. Inside that, we use a switchMap on our token stream, which means any time a new Observable is emitted from the switchMap, it drops the last one and subscribes to the new one. Any time a token is true, it makes a new subscription to the people queue (and multiple trues in a row is fine, because it's canceling the old subscriptions). When it's false, it just dumps out an empty Observable to not waste time.

还有一些(通过的)测试用例:

And some (passing) testcases:

@RunWith(JUnit4.class)
public class TurnstileTest {
    private final TestScheduler scheduler = new TestScheduler();
    private final TestSubscriber<String> output = new TestSubscriber<>();

    private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
    private final TestSubject<String> queue = TestSubject.create(scheduler);

    @Before
    public void setup() {
        Turnstile.getTurnstile(queue, tokens).subscribe(output);
    }

    @Test
    public void allowsOneWithTokenBefore() {
        tokens.onNext(true, 0);
        queue.onNext("Bill", 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void tokenBeforeIsCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(false, 1);
        queue.onNext("Bill", 2);

        assertNonePassed();
    }

    @Test
    public void tokensBeforeAreCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        tokens.onNext(false, 3);
        queue.onNext("Bill", 4);

        assertNonePassed();
    }

    @Test
    public void eventualPassThroughAfterFalseTokens() {
        tokens.onNext(false, 0);
        queue.onNext("Bill", 1);
        tokens.onNext(false, 2);
        tokens.onNext(false, 3);
        queue.onNext("Jane", 4);
        queue.onNext("Bob", 5);
        tokens.onNext(true, 6);
        tokens.onNext(true, 7);
        tokens.onNext(false, 8);
        tokens.onNext(false, 9);
        queue.onNext("Phil", 10);
        tokens.onNext(false, 11);
        tokens.onNext(false, 12);
        tokens.onNext(true, 13);

        assertPassedThrough("Bill", "Jane", "Bob");
    }

    @Test
    public void allowsOneWithTokenAfter() {
        queue.onNext("Bill", 0);
        tokens.onNext(true, 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        queue.onNext("Bill", 3);
        tokens.onNext(true, 4);
        tokens.onNext(true, 5);
        queue.onNext("Jane", 6);
        queue.onNext("John", 7);

        assertPassedThrough("Bill", "Jane");
    }

    @Test
    public void noneShallPassWithoutToken() {
        queue.onNext("Jane", 0);
        queue.onNext("John", 1);

        assertNonePassed();
    }

    private void closeSubjects() {
        scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
        scheduler.triggerActions();
        tokens.onCompleted();
        queue.onCompleted();
        scheduler.triggerActions();
    }

    private void assertNonePassed() {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList());
    }

    private void assertPassedThrough(final String... names) {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList(names));
    }
}

如果您发现任何不适用于此的边缘情况,请告诉我,特别是如果实时出现问题,因为测试显然是在受控环境中进行的.

Let me know if you find any edge cases that don't work with this, particularly if it has trouble in real time since the tests are obviously in a controlled environment.

这篇关于使用 RxJava 实现类似旋转门的操作符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆