如何从阻塞队列创建反应堆 Flux? [英] How can I create reactor Flux from a blocking queue?
问题描述
我正在尝试实现从 BlockingQueue 创建的 reactor Flux 但不确定哪个运算符最适合我的用例?
I am trying to implement a reactor Flux created from a BlockingQueue but not sure which operator is best for my use case?
我正在创建一个流式 REST 端点,其中响应是 Flux,它需要不断地从 BlockingQueue 发出消息作为对 GET REST 调用的响应.
I am creating a streaming REST end point, where response is Flux that needs to keep emitting messages from a BlockingQueue as a response to GET REST call.
我已经尝试过论坛和文档,但只能找到从可迭代集合或响应式数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例.
I have already tried forums and documentation and can only find Flux initiated from iterable collections or reactive data sources, but no examples from any BlockingQueue.
推荐答案
你可以试试 Flux#generate 和 队列#peek.请记住,如果队列为空,peek
将返回 null
,并且不能在 onNext
中使用.
You can try Flux#generate and Queue#peek. Just keep in mind that peek
will return null
if the queue is empty, and it cannot be used in onNext
.
类似于:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
还有 Flux#repeatWhen 操作符,以防您想在队列被视为空后重新订阅队列,例如与:
There is also Flux#repeatWhen operator, in case you want to re-subscribe to the queue after it was considered empty, e.g. with:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
这篇关于如何从阻塞队列创建反应堆 Flux?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!