Schedulers.boundedElastic似乎使用相同的线程进行处理 [英] Schedulers.boundedElastic appears to use same Thread for processing

查看:17
本文介绍了Schedulers.boundedElastic似乎使用相同的线程进行处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据我对API的理解,使用Schedulers.boundedElastic()或像Schedulers.newBoundedElastic(3,10,&Quot;MyThreadGroup&Quot;);或Schedulers.from Executor(Executor)这样的变体允许在多个线程中处理一个IO操作。

但是,使用以下示例代码的模拟似乎表明,有一个线程/相同的线程在平面映射中执行该工作

Flux.range(0, 100)
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();

Thread.sleep(10000); // main thread

//This yields the following

Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...

上面的输出向我暗示,相同的线程正在平板映射中运行。在Subcribe上为多个IO调用flatMap时,有没有办法让多个线程处理项目?我原以为会看到BoundedElastic-1,Bound Elastic-2... 。

推荐答案

让Flat Map在多个线程上运行的一种方法是创建一个ParallFlux。下面的示例代码做到了这一点。

Flux.range(0, 1000)
                .parallel()             
                .runOn(Schedulers.boundedElastic())
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribe();
        
        Thread.sleep(10000);

这篇关于Schedulers.boundedElastic似乎使用相同的线程进行处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆