如何限制flatMap流程中当前正在处理的事件? [英] How do I limit the events currently being processed in a flatMap process?
问题描述
给出以下代码
public static void main(String[] args) {
long start = System.currentTimeMillis();
Flux.<Long>generate(s -> s.next(System.currentTimeMillis() - start))
.flatMap(DemoApp::delayedAction)
.doOnNext(l -> System.out.println(l + " -- " + (System.currentTimeMillis() - start)))
.blockLast(Duration.ofSeconds(3));
}
private static Publisher<? extends Long> delayedAction(Long l) {
return Mono.just(l).delayElement(Duration.ofSeconds(1));
}
从输出中可以看到,在delayedAction
中同时处理了"大量事件.
在此示例中,在几毫秒内生成了256个事件,然后等待大约一秒钟,直到它们再次发出.
One can see from the output that a large number of events are "processed" in delayedAction
concurrently.
In this example 256 events get generated within a few milliseconds and then wait about a second until they get emitted again.
我想将此数字限制为例如10,我该怎么办?
I want to limit this number to e.g. 10, how can I do this?
解决方案应独立于delayedAction
背景
延迟动作中真正发生的事情是:我执行HTTP请求,并且启动无限制(或非常大)的请求听起来不是一个好主意.
What is really happening in delayed Action is: I do HTTP requests and starting an unlimited (or very large) amount of requests doesn't sound like a good idea.
推荐答案
已有解决方法:从其文档中:
并发参数允许控制可以并行订阅和合并的发布者数量.
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel.
这篇关于如何限制flatMap流程中当前正在处理的事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!