RxJava是否适合分支工作流程? [英] Is RxJava a good fit for branching workflows?

查看:208
本文介绍了RxJava是否适合分支工作流程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用RxJava处理我们从队列中提取的一些通知。

I am using RxJava to process some notifications that we pull from a queue.

RxJava似乎可以通过简单的工作流程正常工作,现在有了新的要求,流量越来越复杂,分支越多(请参见下图作为参考)

我试图通过一个小单元测试来举例说明流程:

RxJava seemed to work fine with a simple workflow, now with new requirements coming in, the flow is growing in complexity with more branches (please see below picture as a reference) I tried to exemplify the flow with a small unit test:

@Test
public void test() {
    Observable.range(1, 100)
        .groupBy(n -> n % 3)
        .toMap(GroupedObservable::getKey)
        .flatMap(m1 -> {
            Observable<Integer> ones1 = m1.get(0);
            Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);
            Observable<Integer> threes = m1.get(2).map(n -> n + 100);
            Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)
                .map(n -> n * 3)
                .groupBy(n -> n % 2)
                .toMap(GroupedObservable::getKey)
                .flatMap(m2 -> {
                    Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);
                    Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);
                    return Observable.merge(ones2, twos2);
                });
                return Observable.merge(onesAndTwos, threes).map(n -> n +1);
        })
        .subscribe(System.out::println);
}

虽然使用RxJava在技术上仍然可以实现,但我现在想知道它是否是不错的选择,为了正式化分支,我必须在主 flatMap 中进行2级嵌套,这看起来并不是很整洁。

Whilst it's still technically achievable to use RxJava, I am now wondering if it's a good choice, as to formalise the branching I had to do 2 level of nesting inside the main flatMap, which doesn't seem really neat.

这是描述上述工作流程的正确方法吗?或者RxJava不适合分支工作流程?

Would this be the right way of describing a workflow like above? Or RxJava is not a good fit for branching workflows?

感谢您的帮助!

推荐答案

只是想出另一种可能对你有用的方法:你可以组织源并单独处理分支,而不是分组/ toMap。

Just an idea for another approach which may work for you: Instead of grouping/toMap, you could multicast the source and handle the branches individually.

示例:

@Test
public void multicastingShare() {
    final Observable<Integer> sharedSource = Observable.range(1, 10)
            .doOnSubscribe(dummy -> System.out.println("subscribed"))
            .share();
    // split by some criteria
    final Observable<String> oddItems = sharedSource
            .filter(n -> n % 2 == 1)
            .map(odd -> "odd: " + odd)
            .doOnNext(System.out::println);
    final Observable<String> evenItems = sharedSource
            .filter(n -> n % 2 == 0)
            .map(even -> "even: " + even)
            .doOnNext(System.out::println);

    // recombine the individual streams at some point
    Observable.concat(oddItems, evenItems)
            .subscribe(result -> System.out.println("result: " + result));
}

视频可能会有所帮助(至少前15分钟)

This video may be be helpful (at least the first 15 min)

这篇关于RxJava是否适合分支工作流程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆