为流量用户注册不同的线程 [英] Reg different threads for flux consumer

查看:0
本文介绍了为流量用户注册不同的线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在以下两个示例中,处理流量流的行为似乎不同。

示例1:

public static void main(String[] args) throws InterruptedException {
        log.debug(" Before reading flux stream");
        Flux<Customer> results = readFluxStream();
        log.debug(Thread.currentThread().getName() + " : After reading flux stream");
        results.subscribe(new LearnFlux().new CustomerConsumer());
        Thread.sleep(5000);
        log.debug(" Exit Main Thread ");

    }

    public static Flux<Customer> readFluxStream() {
        List<Customer> customers = buildCustomers();
        Customer[] customerArray = new Customer[customers.size()];
        customerArray = customers.toArray(customerArray);
        Flux<Customer> temp = Flux.fromArray(customerArray).delayElements(Duration.ofSeconds(1)).log();
        return temp;
    }

    
    private class CustomerConsumer implements Consumer<Customer> {

        @Override
        public void accept(Customer customer) {
            log.debug(Thread.currentThread().getName() + " This is a consumer " + customer.getFirstName());

        }
    }

从下面的日志中我们了解到,Flux使用者正在另一个线程中运行。(在*中突出显示)。我在主线程中引入了休眠,以便可以在控制台中捕获使用者日志。

19:07:24.695 [***main***] DEBUG com.learnjava.LearnFlux -  Before reading flux stream
19:07:24.759 [***main***] DEBUG reactor.util.Loggers - Using Slf4j logging framework
19:07:24.779 [***main***] DEBUG com.learnjava.LearnFlux - main : After reading flux stream
19:07:24.788 [***main***] INFO  reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
19:07:24.790 [***main***] INFO  reactor.Flux.ConcatMap.1 - request(unbounded)
19:07:25.821 [***parallel-1***] INFO  reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Tom', lastName='Cruise'])
19:07:25.835 [***parallel-1***] DEBUG com.learnjava.LearnFlux - parallel-1 This is a consumer Tom
19:07:26.841 [***parallel-2***] INFO  reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Jim', lastName='Carry'])
19:07:26.842 [***parallel-2***] DEBUG com.learnjava.LearnFlux - parallel-2 This is a consumer Jim
19:07:26.844 [***parallel-2***] INFO  reactor.Flux.ConcatMap.1 - onComplete()
19:07:29.817 [***main***] DEBUG com.learnjava.LearnFlux -  Exit Main Thread 

示例2

public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {

    @Query("SELECT * FROM customer WHERE last_name = :lastname")
    Flux<Customer> findByLastName(String lastName);

}


public class CustomerConsumer implements Consumer<Customer> {

    private static final Logger log = LoggerFactory.getLogger(CustomerConsumer.class);
    @Override
    public void accept(Customer customer) {
        
        log.info(" This is a concusmer " + customer);
    
        
    }
}



log.info(" Invoking R2DBC flux response ");
            Flux<Customer> customers = repository.findAll();
            customers.subscribe(new CustomerConsumer());
            log.info("complete consumer in main thread");

从下面的日志中,我们观察到使用者正在同一主线程中运行。(在*中突出显示)

[***main***] Invoking R2DBC flux response 
[***main***] This is a concusmer Customer[id=1, firstName='Jack', lastName='Bauer']
[***main***]This is a concusmer Customer[id=2, firstName='Chloe', lastName='O'Brian']
[***main***] This is a concusmer Customer[id=3, firstName='Michelle', lastName='Dessler']
[***main***] complete consumer in main thread

澄清

为什么第一个示例中的通量使用者运行在不同的线程中,而基于R2DBC的存储库(第二个示例)返回的通量是在同一主线程中处理的?

推荐答案

为什么第一个示例中的通量使用者运行在不同的线程中,而基于R2DBC的存储库(第二个示例)返回的通量是在同一主线程中处理的?

这里的关键理解是,任何被动操作符都可以在它认为合适的时候切换线程(或者更准确地说,切换调度程序)。虽然大多数运算符不会切换,但基于时间的运算符必须切换,并且它们将默认使用并行调度程序。

在第一个示例中,您使用的是delayElements()运算符。因为它是基于时间的操作符,所以默认情况下,它会切换到并行调度器,然后在并行执行器(以及您在日志中看到的并行线程)上运行。基于时间的调度程序必须切换,因为将使您的操作保持在同一线程上的&q;立即&q;调度程序不能基于时间的调度(这是delayElements运算符所要求的。)

这并不是说,如果您有特殊原因不使用并行调度程序,就必须--有一个重载允许您随心所欲地设置它。例如,如果您使用.delayElements(Duration.ofSeconds(1), Schedulers.boundedElastic()),您将看到您的日志将显示正在使用的有界弹性线程池。

相反,在您的第二个R2DBC示例中,没有操作员将其从即时调度程序切换出去。正如您从日志中看到的,它将只在主线程上运行。

如果您想更深入地了解反应堆中线程的工作原理,Simon的《通量演讲之旅》非常值得一看:https://m.youtube.com/watch?v=sNgTTcG-fEU-还有一些附带的博客文章。

这篇关于为流量用户注册不同的线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆