你有测试来显示反应器 map() 和 flatMap() 之间的差异吗? [英] Do you have a test to show differences between the reactor map() and flatMap()?

查看:60
本文介绍了你有测试来显示反应器 map() 和 flatMap() 之间的差异吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我仍在尝试理解 reactor map() 和 flatMap() 方法之间的区别.首先我查看了 API,但它并没有真正的帮助,它让我更加困惑.然后我用谷歌搜索了很多,但似乎没有人有例子可以让差异变得可以理解,如果有任何差异.

因此,我尝试编写两个测试来查看每种方法的不同行为.但不幸的是它并没有像我希望的那样工作......

第一个测试方法是测试反应式 flatMap() 方法:

@Test无效通量FlatMapTest(){Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).window(2).flatMap(fluxOfInts ->fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel())).doOnNext(System.out::println).订阅();}

输出符合预期,可解释,如下所示:

9 - parallel-21 - 并行 14 - 并行 125 - 并行 336 - 并行349 - 并行464 - 并行 481 - 并行5100 - 并行 516 - 并行 2

第二种方法应该测试 map() 方法的输出,以与 flatMap() 方法的上述结果进行比较.

@Test无效的fluxMapTest(){最终 int start = 1;最终 int 停止 = 100;Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).window(2).map(fluxOfInts ->fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel())).doOnNext(System.out::println).订阅();}

这个测试方法有输出,我完全没想到,看起来像这样:

FluxSubscribeOn流量订阅流量订阅流量订阅流量订阅

有一个看起来像这样的小助手方法:

private String processNumber(Integer x) {String squaredValueAsString = String.valueOf(x * x);return squaredValueAsString.concat(" - ").concat(Thread.currentThread().getName());}

这里没什么特别的.

我使用 Spring Boot 2.3.4 和 Java 11 以及 Spring 的 reactor 实现.

您是否有一个很好的解释示例,或者您是否知道如何更改上述测试以使其有意义?那么请帮我解决这个问题.提前非常感谢!

解决方案

Reactor 是 Webflux 中的底层库,由称为 event loop 的东西组成,我认为它基于一种称为

和 api 文本说:

<块引用>

通过对每个项目应用同步函数来转换此 Flux 发出的项目.

这是不言自明的.我们有一个 Flux 项目,每次 map 要求处理一个项目时,它不会要求另一个项目,直到它完成第一个处理.因此同步.

图片显示,绿色圆圈需要转换为绿色方块,直到我们可以要求将黄色圆圈转换为黄色方块......等等.

这是一个代码示例:

Flux.just("a", "b", "c").map(value -> value.toUppercase()).subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));//输出A - 主要B - 主要C - 主要

每个都在主线程上运行,并同步处理.

平面地图

如果我们查看 api,我们可以看到下图:

文字说:

<块引用>

将此 Flux 发出的元素异步转换为 Publishers,然后通过合并将这些内部发布者扁平化为单个 Flux,从而允许它们交错.

它基本上使用三个步骤来做到这一点:

  • 生成内部函数和订阅:此操作员急切地订阅其内部函数.
  • 展平值的顺序:此运算符不一定保留原始顺序,因为内部元素在到达时会展平.
  • Interleaving:这个运算符让来自不同内部的值交错(类似于合并内部序列).

这是什么意思?嗯,它基本上意味着:

  1. 它将获取通量中的每个项目,并将其转换为单独的 Mono(发布者),每个项目中包含一个项目.

  2. 在处理项目时对其进行排序,flatMap 不会NOT 保留顺序,因为项目可以在事件循环中以不同的时间处理.

  3. 将所有处理过的项目合并回一个 Flux 以便进一步处理.

这是一个代码示例:

Flux.just("a", "b", "c").flatMap(value -> Mono.just(value.toUpperCase())).subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));//输出A - 主要B - 主要C - 主要

等待 flatMap 打印与地图相同的东西!

好吧,这一切又回到了我们之前谈到的线程模型.实际上只有一个线程称为事件循环来处理所有事件.

Reactor 是并发不可知的,这意味着任何 worker 都可以安排由 event loop 处理的作业.

那么什么是 worker 以及 workerscheduler 可以产生的东西.一件重要的事情是,worker 不一定是一个线程,它可以是,但不一定是.

在上面的代码案例中,主线程订阅了我们的flux,这意味着主线程将为我们处理它并安排事件循环处理的工作.

在服务器环境中,情况不一定是这样.这里要理解的重要一点是,如果需要,reactor 可以随时切换 workers(也称为可能的线程).

在我上面的代码示例中,只有一个主线程,因此不需要在多个线程上运行,也不需要并行执行.

如果我想强制它,我可以使用不同的调度程序之一,它们都有自己的用途.在 Netty 中,服务器将启动与您机器上的内核数量相同数量的事件循环线程,因此它可以在重负载下根据需要自由切换工作线程和内核,以最大限度地利用所有事件循环.

FlatMap 异步并不意味着并行,这意味着它会同时安排事件循环处理的所有事情,但它仍然只有一个线程执行任务.

并行执行

如果我真的想并行执行某些事情,例如可以将某些内容放在并行调度程序上.这意味着它将保证在多个内核上有多个 workers.但请记住,当您的程序运行时,有一个设置时间,这通常仅在您有大量计算内容而需要大量单核 CPU 能力时才有用.

代码示例:

Flux.just("a", "b", "c").flatMap(value -> value -> Mono.just(value.toUpperCase())).subscribeOn(Schedulers.parallel()).subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));//输出A - 并行 1B - 并行 1C - 并行 1

这里我们仍然只在一个线程上运行,因为 onSubscribe 意味着当一个线程订阅时 Scheduler 将从调度程序池中选择一个线程然后坚持下去整个执行过程.

如果我们想绝对需要在多个线程上强制执行,我们可以例如使用并行通量.

Flux.range(1, 10).parallel(2).runOn(Schedulers.parallel()).subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));//输出并行 3 ->2并行 2 ->1并行 3 ->4并行 2 ->3并行 3 ->6并行 2 ->5并行 3 ->8并行 2 ->7并行 3 ->10并行 2 ->9

但请记住,这在大多数情况下是不必要的.有一个设置时间,这种类型的执行通常只有在您有很多 CPU 繁重的任务时才有用.否则在大多数情况下使用默认的事件循环单线程将可能"更快.

处理大量 I/O 任务,通常更多的是关于编排,而不是原始 CPU 能力.


这里的大部分信息都是从 FluxMono api 中获取的.

Reactor 文档信息.

还有 Simon Baslé 的博客系列 流量的飞行 也是一本精彩而有趣的读物.它也存在于 Youtube 格式

这里和那里也有一些错误,我也做了一些假设,尤其是在 Reactor 的内部工作方面.但希望这至少能澄清一些想法.

如果有人觉得事情直接有问题,请随时编辑.

I am still trying to understand the difference between the reactor map() and flatMap() method. First I took a look at the API, but it isn't really helpful, it confused me even more. Then I googled a lot, but it seems like nobody has an example to make the differences understandable, if there are any differences.

Therefore I tried to write two tests to see the different behaviour for each methods. But unfortunatley it isn't working as I hoped it would...

First test method is testing the reactive flatMap() method:

@Test
void fluxFlatMapTest() {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .flatMap(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

The output is as expected, explainable and looks like that:

9 - parallel-2
1 - parallel-1
4 - parallel-1
25 - parallel-3
36 - parallel-3
49 - parallel-4
64 - parallel-4
81 - parallel-5
100 - parallel-5
16 - parallel-2

The second method should test the output of the map() method to compare with above results of the flatMap() method.

@Test
void fluxMapTest() {
    final int start = 1;
    final int stop = 100;
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .map(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

This test method has the output, I didn't expected at all and looks like that:

FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn

There is a little helper method which looks like that:

private String processNumber(Integer x) {
    String squaredValueAsString = String.valueOf(x * x);
    return squaredValueAsString.concat(" - ").concat(Thread.currentThread().getName());
}

Nothing special here.

I am using Spring Boot 2.3.4 with Java 11 and the reactor implementation for Spring.

Do you have a good explaning example or do you know how to change the above tests so that they make sense? Then please help me out with that. Thanks a lot in advance!

解决方案

Reactor which is the underlying library in Webflux consists of something called the event loop which in turn i believe is based on an architecture called the LMAX Architecture.

This means that the event loop is a single threaded event processer. Everything up to the event loop can be multithreaded but the events themselves are processed by a single thread. The event loop.

Regular spring boot applications are usually run using the server tomcat, or undertow, while webflux is per default run by the event driven server Netty, which in turn uses this event loop to process events for us.

So now that we understand what is underneath everything we can start talking about map and flatMap.

Map

If we look in the api we can see the following image:

and the api text says:

Transform the items emitted by this Flux by applying a synchronous function to each item.

Which is pretty self explanatory. We have a Flux of items, and each time map asks for an item to process it wont ask for another one until it has finished processing the first. Hence synchronous.

The image shows that, green circle needs to be converted to a green square, until we can ask for the the yellow circle to be converted to a yellow square... etc. etc.

here is a code example:

Flux.just("a", "b", "c")
    .map(value -> value.toUppercase())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

Each are run on the main thread, and processed after each other synhronously.

flatMap

If we look in the api we can see the following image:

and the text says:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

it does this using basically three steps:

  • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
  • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
  • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).

So what does this mean? well it basically means that:

  1. it will take each item in the flux, and transform it to individual Mono (publisher) with one item in each.

  2. Order the items as they get processed, flatMap does NOT preserve order, as items can be processed in a different amount of time on the event loop.

  3. Merging back all the processed items into a Flux for further processing down the line.

Here is a code example:

Flux.just("a", "b", "c")
        .flatMap(value -> Mono.just(value.toUpperCase()))
         .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

Wait flatMap printing the same thing as map!

Well, it all comes back to the threading model we talked about earlier. Actually there is only one thread called the event loop that handles all events.

Reactor is concurrent agnostic meaning that any worker can schedule jobs to be handled by the event loop.

So what is a worker well a worker is something a scheduler can spawn. And one important thing is that a worker doesn't have to be a thread, it can be, but it doesn't have to be.

In the above code cases, the main thread subscribes to our flux, which means that the main thread will process this for us and schedule work for the event loop to handle.

In a server environment this necessarily doesn't have to be the case. The important thing to understand here is that reactor can switch workers (aka possible threads) whenever it wants if it needs to.

In my above code examples there is only a main thread, so there is no need to run things on multiple threads, or have parallel execution.

If i wish to force it, i can use one of the different schedulers which all have their uses. In Netty, the server will start up will the same amount of event loop threads as cores on your machine, so there it can switch workers and cores freely if needed at heavy loads to maximize the usage of all event loops..

FlatMap being async does NOT mean parallel, it means that it will schedule all things to be processed by the event loop at the same time but its still only one thread executing the tasks.

Parallel execution

if i really want to execute something in parallel you can for instance place something on a parallel Scheduler. This means that it it will guarantee multiple workers on multiple cores. But remember there is a setup time for this when your program is run, and this is usually only beneficial if you have heavy computational stuff which in turn needs a lot of single core CPU power.

code example:

Flux.just("a", "b", "c")
    .flatMap(value -> value -> Mono.just(value.toUpperCase()))
    .subscribeOn(Schedulers.parallel())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - parallel-1
B - parallel-1
C - parallel-1

Here we are still running on just one thread, because onSubscribe means that when a thread subscribes the Scheduler will pick one thread from the scheduler pool and then stick with it throughout execution.

if we want to absolutely feel the need to force execution on multiple threads we can for instance use a parallel flux.

Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

// Output
parallel-3 -> 2
parallel-2 -> 1
parallel-3 -> 4
parallel-2 -> 3
parallel-3 -> 6
parallel-2 -> 5
parallel-3 -> 8
parallel-2 -> 7
parallel-3 -> 10
parallel-2 -> 9

But remember this is in most cases not necessary. There is a setup time, and this type of execution is usually only beneficial if you have a lot of cpu heavy tasks. Otherwise using the default event loop single thread will in most cases "probably" be faster.

Dealing with a lot of i/o tasks, is usually more about orchestration, than raw CPU power.


Most of the information here is fetched from the Flux and Mono api.

the Reactor documentation is an amazing and interesting source of information.

also Simon Baslé's blog series Flight of the flux is also a wonderful and interesting read. It also exists in Youtube format

There is also some faults here and there and i have made some assumptions too especially when it comes to the inner workings of Reactor. But hopefully this will at least clear up some thoughts.

If someone feels things are direct faulty, feel free to edit.

这篇关于你有测试来显示反应器 map() 和 flatMap() 之间的差异吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆