RxJava是否适合分支工作流程? [英] Is RxJava a good fit for branching workflows?
问题描述
我正在使用RxJava处理我们从队列中提取的一些通知。
I am using RxJava to process some notifications that we pull from a queue.
RxJava似乎可以通过简单的工作流程正常工作,现在有了新的要求,流量越来越复杂,分支越多(请参见下图作为参考)
我试图通过一个小单元测试来举例说明流程:
RxJava seemed to work fine with a simple workflow, now with new requirements coming in, the flow is growing in complexity with more branches (please see below picture as a reference) I tried to exemplify the flow with a small unit test:
@Test
public void test() {
Observable.range(1, 100)
.groupBy(n -> n % 3)
.toMap(GroupedObservable::getKey)
.flatMap(m1 -> {
Observable<Integer> ones1 = m1.get(0);
Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);
Observable<Integer> threes = m1.get(2).map(n -> n + 100);
Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)
.map(n -> n * 3)
.groupBy(n -> n % 2)
.toMap(GroupedObservable::getKey)
.flatMap(m2 -> {
Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);
Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);
return Observable.merge(ones2, twos2);
});
return Observable.merge(onesAndTwos, threes).map(n -> n +1);
})
.subscribe(System.out::println);
}
虽然使用RxJava在技术上仍然可以实现,但我现在想知道它是否是不错的选择,为了正式化分支,我必须在主 flatMap
中进行2级嵌套,这看起来并不是很整洁。
Whilst it's still technically achievable to use RxJava, I am now wondering if it's a good choice, as to formalise the branching I had to do 2 level of nesting inside the main flatMap
, which doesn't seem really neat.
这是描述上述工作流程的正确方法吗?或者RxJava不适合分支工作流程?
Would this be the right way of describing a workflow like above? Or RxJava is not a good fit for branching workflows?
感谢您的帮助!
推荐答案
只是想出另一种可能对你有用的方法:你可以组织源并单独处理分支,而不是分组/ toMap。
Just an idea for another approach which may work for you: Instead of grouping/toMap, you could multicast the source and handle the branches individually.
示例:
@Test
public void multicastingShare() {
final Observable<Integer> sharedSource = Observable.range(1, 10)
.doOnSubscribe(dummy -> System.out.println("subscribed"))
.share();
// split by some criteria
final Observable<String> oddItems = sharedSource
.filter(n -> n % 2 == 1)
.map(odd -> "odd: " + odd)
.doOnNext(System.out::println);
final Observable<String> evenItems = sharedSource
.filter(n -> n % 2 == 0)
.map(even -> "even: " + even)
.doOnNext(System.out::println);
// recombine the individual streams at some point
Observable.concat(oddItems, evenItems)
.subscribe(result -> System.out.println("result: " + result));
}
这视频可能会有所帮助(至少前15分钟)
This video may be be helpful (at least the first 15 min)
这篇关于RxJava是否适合分支工作流程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!