如何并行处理 Flux 事件? [英] How do I process Flux events in parallel to each other?

查看:43
本文介绍了如何并行处理 Flux 事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有需要丰富的传入事件流,然后在它们到达时并行处理.

I have streams of incoming events that need to be enriched, and then processed in parallel as they arrive.

我认为 Project Reactor 是为这项工作定制的,但在我的测试中,所有处理似乎都是连续完成的.

I was thinking Project Reactor was made to order for the job, but in my tests all of the processing seems to be done serially.

这是一些测试代码:

ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
        .map(i-> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .take(3)
//    .subscribeOn(Schedulers.elastic());
//    .subscribeOn(Schedulers.newParallel("test"));
//    .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()), 
               System.out::println, 
               ()-> System.out.println("Done"));
System.out.println("DONE AND DONE");

我尝试取消注释每一行的注释,但是在每种情况下,输出都表明使用同一个线程来处理所有事件

I have tried uncommenting each of the commented lines, however in every case the output indicates that the same thread is used to process all of the events

Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done

(唯一的区别是没有调度器,它们在订阅线程上运行,而对于任何执行器,它们都运行在同一个线程中,而不是订阅线程.)

(The only difference is that without the Schedulers, they are run on the subscribe thread, whereas with any of the executors, they all run in the same thread, which is not the subscribe thread.)

我错过了什么?

仅供参考,有一种睡眠"方法:

FYI, there is a "sleep" method:

public static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        System.out.println("Exiting");
    }
}

推荐答案

并行处理项目的一种方式,是使用 .parallel/.runOn

One way to handle items in parallel, is to use .parallel / .runOn

flux
    .parallel(10)
    .runOn(scheduler)
    //
    // Work to be performed in parallel goes here.  (e.g. .map, .flatMap, etc)
    //
    // Then, if/when you're ready to go back to sequential, call .sequential()
    .sequential()

阻塞操作(例如阻塞IO,或Thread.sleep)将阻塞执行它们的线程.响应式流不能神奇地将阻塞方法变成非阻塞方法.因此,您需要确保阻塞方法在适合阻塞操作的 Scheduler 上运行(例如 Schedulers.boundedElastic()).

Blocking operations (such as blocking IO, or Thread.sleep) will block the thread on which they are executed. Reactive streams cannot magically turn a blocking method into a non-blocking method. Therefore, you need to ensure blocking methods are run on a Scheduler suitable for blocking operations (e.g. Schedulers.boundedElastic()).

在上面的例子中,因为你知道你正在调用一个阻塞操作,你可以使用 .runOn(Schedulers.boundedElastic()).

In the example above, since you know you are calling a blocking operation, you could use .runOn(Schedulers.boundedElastic()).

根据用例,您还可以将 .flatMap 等异步运算符与 .subscribeOn.publishOn 结合使用来委托特定的阻止对另一个 Scheduler 的操作,如 所述在项目反应堆文档中.例如:

Depending on the use case, you can also use async operators like .flatMap in combination with .subscribeOn or .publishOn to delegate specific blocking operations to another Scheduler, as described in the project reactor docs. For example:

flux
    .flatMap(i -> Mono.fromCallable(() -> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .subscribeOn(Schedulers.boundedElastic()))

事实上,.flatMap 也有一个重载变体,它带有一个 concurrency 参数,您可以在其中限制动态内部序列的最大数量.在某些用例中,这可以代替 .parallel 使用.不过,它通常适用于 Flux.interval,因为 Flux.interval 不支持补充比滴答慢的下游请求.

In fact, .flatMap also has an overloaded variant that takes a concurrency parameter where you can limit the maximum number of in-flight inner sequences. This can be used instead of .parallel in some use cases. It will not generally work for Flux.interval though, since Flux.interval doesn't support downstream requests that replenish slower than the ticks.

这篇关于如何并行处理 Flux 事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆