Flux 的方法 publishOn 没有按预期工作 [英] The method publishOn of Flux doesn't not work as expected

查看:50
本文介绍了Flux 的方法 publishOn 没有按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 Reactor Aluminium-SR1 中将阻塞消费者集成为 Flux 订阅者.我想使用并行调度程序,并发执行阻塞操作.

I'm trying to integrate a blocking consumer as a Flux subscriber in Reactor Aluminium-SR1. I would like to use a parallel Scheduler, to execute the blocking operations concurrently.

我已经实现了一个主类来描述我的意图:

I've implement a main class to describe my intention:

package etienne.peiniau;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
                .elapsed()
                .publishOn(Schedulers.parallel())
                .subscribe(new Subscriber<Tuple2<Long, Integer>>() {
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
                        subscription.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Tuple2<Long, Integer> t2) {
                        System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
                        try {
                            Thread.sleep(1000); // long operation
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("[" + Thread.currentThread().getName() + "] Complete");
                    }
                });
        // Waiting for the program to complete
        System.out.println("[" + Thread.currentThread().getName() + "] Main");
        Thread.sleep(100000);
    }

}

这段代码的输出如下:

[main] Subscription
[main] Main
[parallel-1] [3,1]
[parallel-1] [1000,2]
[parallel-1] [1001,3]
[parallel-1] [1000,4]
[parallel-1] [1000,5]
[parallel-1] [1000,6]
[parallel-1] [1001,7]
[parallel-1] [1000,8]
[parallel-1] [1000,9]
[parallel-1] [1000,10]
[parallel-1] [1000,11]
[parallel-1] [1001,12]
[parallel-1] [1000,13]
[parallel-1] [1000,14]
[parallel-1] [1000,15]
[parallel-1] [1000,16]
[parallel-1] [1001,17]
[parallel-1] [1000,18]
[parallel-1] [1000,19]
[parallel-1] [1000,20]
[parallel-1] Complete

我的问题是长操作总是在线程parallel-1上每1秒执行一次.

My problem is that the long operation is always executed on the thread parallel-1 and every 1 second.

我尝试手动增加并行度或使用弹性调度程序,但结果是一样的.

I've tried to increase parallelism manually or to use an elastic Scheduler, but the result is the same.

我在想 publishOn 方法是专门为这个用例设计的.如果我误解了什么,你能告诉我吗?

I was thinking that publishOn method was specially designed for this use case. Can you tell me if I misunderstood something ?

推荐答案

实际上它按预期工作,您可以看到并行处理的所有值 - 经过的时间几乎相同,但您总是在同一个线程中接收元素并且每次等待 1 秒时就这样.

Actually it works as expected you can see that all values where processed in parallel - elapsed time is nearly the same, but you always receive elements within the same thread and with that way each time you wait 1 second.

我猜在简单的 Flux 并行并不意味着更多的线程,它意味着并行工作.例如,如果您运行如下代码:

I guess that in simple Flux parallel doesn't mean more thread, it means to do work in parallel. If you for example run code like:

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
            .map(i -> {
                System.out.println("map [" + Thread.currentThread().getName() + "] " + i);
                return i;
            })
            .elapsed()
            .publishOn(Schedulers.single())
            .subscribeOn(Schedulers.single())
            .subscribe(t2 -> {
                System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2);
            });

你会看到结果:

map [single-1] 0
map [single-1] 1
...
subscribe [single-1] [4,0]
subscribe [single-1] [0,1]
...

你可以看到它首先对所有元素进行map,然后是consume.如果您将 publishOn 更改为 .publishOn(Schedulers.parallel()),您将看到:

And you can see that first it does map for all elements and then consume. If you change publishOn to .publishOn(Schedulers.parallel()) you will see:

map [single-1] 3
subscribe [parallel-1] [5,0]
map [single-1] 4
subscribe [parallel-1] [0,1]
map [single-1] 5
...

现在它同时在并行线程中执行这两个操作.我不确定我是否理解正确.

Now it does both operations in parallel threads at once. I'm not sure that I understand everything correctly.

有特定的 ParallelFlux 用于并行执行.在下面的示例中,一切都将在不同的线程上完成:

There is specific ParallelFlux for parallel execution. In example below everything will be done on different threads:

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
        .elapsed()
        .parallel()
        .runOn(Schedulers.parallel())
        .subscribe(t2 -> {
            System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
            try {
                Thread.sleep(1000); // long operation
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, throwable -> {
            System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
        }, () -> {
            System.out.println("[" + Thread.currentThread().getName() + "] Complete");
        }, subscription -> {
            System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
            subscription.request(Long.MAX_VALUE);
        });

结果如下:

[parallel-1] [8,0]
[parallel-2] [0,1]
[parallel-3] [0,2]
[parallel-4] [0,3]
[parallel-1] [0,4]
...

所以它使用很少的线程来处理结果.在我看来,这确实是平行的.

So it uses few threads to process results. And it's truly parallel in my point of view.

另请注意,如果您使用方法 .subscribe(Subscriber<? super T> s) 所有结果都将按顺序使用,并且要并行使用它们,您应该使用:

Also note that if you use method .subscribe(Subscriber<? super T> s) all results will be consumed in sequential way and to consume them in parallel you should use:

public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
            onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)

或任何其他带有 ConsumeronNext,... 参数

or any other overloaded method with Consumer<? super T> onNext,... arguments

这篇关于Flux 的方法 publishOn 没有按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆