如何减缓排放形成 Flux.interval? [英] How can one slow down emissions form Flux.interval?

查看:31
本文介绍了如何减缓排放形成 Flux.interval?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这需要背压还是有更简单的方法?

Would this need back pressure or is there a simpler way?

例如在下面的代码中,我希望每 2 秒调用一次自旋函数.有时旋转"可能需要比 2 秒间隔更长的时间来计算,在这种情况下,我不希望任何间隔排放排队.但在下面的代码中,他们确实排队.

For example in the below code , I want the spin function to be called every 2 seconds. Sometimes 'spin' can take longer time to compute than 2 second interval, in which case I do not want any interval emissions to queue up. But in the below code they do queue up.

在下面的代码中,前 4 个自旋函数调用需要 10 秒,其余的需要 1 秒.因此,一旦函数变得更快, Flux.interval 排放就会赶上".但是,我不希望发生任何追赶"

In the code below, the first 4 spin function calls take 10 seconds and the rest take 1 second. As a result the Flux.interval emissions 'catch up' once the function gets faster. However, I do not want any 'catch up' to happen

import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;

public class Test {
   public static void main(String[] args) {
        Iterator<Integer> secs = new Iterator<Integer>() {
            private int num = 0;
            @Override
            public boolean hasNext() {
                return true;
            }
            @Override
            public Integer next() {
                return num++ < 4 ? 10 : 1;
            }
        };

        Flux.interval(Duration.ofSeconds(5))
                .map(n -> {spin(secs.next()); return n;})
                .doOnNext(n -> log("Processed " + n))
                .blockLast();

    }

    private static void spin(int secs) {
        log("Current job will take " + secs + " secs");
        long sleepTime = secs*1000000000L; // convert to nanos
        long startTime = System.nanoTime();
        while ((System.nanoTime() - startTime) < sleepTime) {}
    }

    static void log(Object label) {
        System.out.println((new Date()).toString() + "\t| " +Thread.currentThread().getName()   + "\t| " + label);
    }
}

输出:请注意,已处理"时间戳最初间隔 10 秒,但从作业 4 到作业 8,有一个我不想发生的赶上".我想在上次调用后不早于 2 秒执行

Output: Notice the "Processed" timestamp initially is spaced by 10 seconds, but from job 4 to job 8, there is a 'catch up' that I do not want to take place. I want to spin to executed no earlier than 2 seconds after the previous invocation

Thu Jun 01 17:16:23 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:16:33 EDT 2017    | parallel-1    | Processed 0
Thu Jun 01 17:16:33 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:16:43 EDT 2017    | parallel-1    | Processed 1
Thu Jun 01 17:16:43 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:16:53 EDT 2017    | parallel-1    | Processed 2
Thu Jun 01 17:16:53 EDT 2017    | parallel-1    | Current job will take 10 secs
Thu Jun 01 17:17:03 EDT 2017    | parallel-1    | Processed 3
Thu Jun 01 17:17:03 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:04 EDT 2017    | parallel-1    | Processed 4
Thu Jun 01 17:17:04 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:05 EDT 2017    | parallel-1    | Processed 5
Thu Jun 01 17:17:05 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:06 EDT 2017    | parallel-1    | Processed 6
Thu Jun 01 17:17:06 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:07 EDT 2017    | parallel-1    | Processed 7
Thu Jun 01 17:17:07 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:08 EDT 2017    | parallel-1    | Processed 8
Thu Jun 01 17:17:08 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:09 EDT 2017    | parallel-1    | Processed 9
Thu Jun 01 17:17:13 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:14 EDT 2017    | parallel-1    | Processed 10
Thu Jun 01 17:17:18 EDT 2017    | parallel-1    | Current job will take 1 secs
Thu Jun 01 17:17:19 EDT 2017    | parallel-1    | Processed 11

推荐答案

您可以使用 delayElements(Duration delay) 方法.如果你想要更细粒度的控制,你可以使用 delayUntil(Function> triggerProvider)

根据您的具体要求,结果可能如下所示:

Depending on your exact requirements the result might look like this:

 Flux.interval(Duration.ofSeconds(5))
     .map(n -> {spin(secs.next()); return n;})
     .doOnNext(n -> log("Processed " + n))
     .delayUntil(n -> Mono.just(1).delayElements(Duration.Seconds(Math.max(0, 2 - n)))
     .blockLast();

这篇关于如何减缓排放形成 Flux.interval?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆