可以在 rxjava 中转换以下代码吗 [英] Is it ok to transform following code in rxjava

查看:37
本文介绍了可以在 rxjava 中转换以下代码吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如,我有以下可运行的 Java 代码.

For example, I have following runnable java code.

它是关于一个生产者和几个并行的消费者.这些消费者正在运行耗时的作业,并且它们是并行运行的.

It is about a producer and several parallel consumers. These consumers are running time consuming jobs, and they are running in parallel.

我想知道这个用例是否匹配 rx-java,以及如何在 rx-java 中重写它.

I wonder if this use case match rx-java, and how to rewrite it in rx-java.

public class DemoInJava {
    public static void main(String[] args) {

        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

        AtomicBoolean done = new AtomicBoolean(false);
        Thread producer = new Thread(() -> {
            int offset = 0;
            int limit = 10;
            while (true) {
                if (queue.isEmpty()) {
                    if (offset < 100) {// there is 100 records in db
                        fetchDataFromDb(offset, limit).forEach(e -> queue.add(e));
                        offset = offset + limit;
                    } else {
                        done.set(true);
                        break; // no more data
                    }
                } else {
                    try {
                        Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way.
                    } catch (InterruptedException e) {
                    }
                }
            }
        });

        List<Thread> consumers = IntStream.range(0, 5).boxed().map(c -> new Thread(() ->
        {
            while (true) {
                Integer i = queue.poll();
                if (i != null) {
                    longRunJob(i);
                } else {
                    if (done.get()) {
                        break;
                    } else {
                        try {
                            Thread.sleep(100l);// I don't like the idea of sleep, but it seems to be the simplest way.
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }
        })).collect(Collectors.toList());

        producer.start();
        consumers.forEach(c -> c.start());
    }

    private static List<Integer> fetchDataFromDb(int offset, int limit) {
        return IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList());
    }

    private static void longRunJob(Integer i) {
        System.out.println(Thread.currentThread().getName() + " long run job of " + i);
    }
}

输出是:

....
Thread-1 long run job of 7
Thread-1 long run job of 8
Thread-1 long run job of 9
Thread-4 long run job of 10
Thread-4 long run job of 16
Thread-10 long run job of 14
Thread-5 long run job of 15
Thread-8 long run job of 13
Thread-7 long run job of 12
Thread-9 long run job of 11
Thread-10 long run job of 19
Thread-4 long run job of 18
Thread-3 long run job of 17
....

推荐答案

我们来看看... 一、代码:

Let's see... First, the code:

package rxtest;

import static io.reactivex.Flowable.generate;
import static io.reactivex.Flowable.just;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.reactivex.Emitter;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

public class Main {

    private static final Scheduler SCHEDULER = Schedulers.from(Executors.newFixedThreadPool(10));

    private static class DatabaseProducer {
        private int offset = 0;
        private int limit = 100;

        void fetchDataFromDb(Emitter<List<Integer>> queue) {
            System.out.println(Thread.currentThread().getName() + " fetching "+offset);
            queue.onNext(IntStream.range(offset, offset + limit).boxed().collect(Collectors.toList()));
            offset += limit;
        }
    }

    public static void main(String[] args) {
        generate(new DatabaseProducer()::fetchDataFromDb)
        .subscribeOn(Schedulers.io())
        .concatMapIterable(list -> list, 1) // 1 call, no prefetch
        .flatMap(item -> 
                just(item)
                .doOnNext(i -> longRunJob(i))
                .subscribeOn(SCHEDULER)
                , 10) // don't subscribe to more than 10 at a time
        .take(1000)
        .blockingSubscribe();
    }

    private static void longRunJob(Integer i) {
        System.out.println(Thread.currentThread().getName() + " long run job of " + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Class DatabaseProducer 只是值的有状态生产者,因为它需要当前的偏移量.这不是绝对必要的,因为 generate 调用可以替换为

Class DatabaseProducer is simply the stateful producer of values, as it needs the current offset. It's not strictly necessary, as the generate call could have been replaced with

        generate(() -> 0, (offset,e) -> {
            e.onNext(IntStream.range(offset, offset + 100).boxed()
                       .collect(Collectors.toList()));
            return offset + 100;
        }, e -> {});

但这几乎没有可读性.

请记住,cocatMapflatMap 可以并且将会预取和预订阅 observables/flowables 直到一个实现相关的限制,即使有没有空闲线程来处理它们——它们只会在调度程序中排队.每次调用上的数字代表我们想要的限制 - concatMap 上的 1 因为我们只想在必要时从数据库中获取(如果你在这里放 2,你可能会过度阅读,但管道中的延迟会更少).

Keep in mind that cocatMap and flatMap can and will pre-fetch and pre-subscribe to observables/flowables up to an implementations-dependent limit, even if there are no free threads to process them - they will simply get queued in the schedulers. The numbers on each call represent the limits that we want to have - 1 on the concatMap because we want to fetch from the database only if it's necessary (if you put here 2, you may over-read, but there will be less latency in the pipeline).

如果你想做 Cpu-bound 计算,那么最好使用 Schedulers.computation(),因为它会自动配置为运行 JVM 的系统的 CPU 数量,并且您可以从代码库的其他部分使用它,这样您就不会使处理器过载.

If you want to do Cpu-bound computation, then it's better to use Schedulers.computation(), as that is auto-configured to the number of CPUs of the system the JVM is running on, and you can use it from other parts of your codebase so that you don't overload the processor.

这篇关于可以在 rxjava 中转换以下代码吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆