如何从阻塞队列创建反应堆 Flux? [英] How can I create reactor Flux from a blocking queue?

查看:62
本文介绍了如何从阻塞队列创建反应堆 Flux?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现从 BlockingQueue 创建的 reactor Flux 但不确定哪个运算符最适合我的用例?

I am trying to implement a reactor Flux created from a BlockingQueue but not sure which operator is best for my use case?

我正在创建一个流式 REST 端点,其中响应是 Flux,它需要不断地从 BlockingQueue 发出消息作为对 GET REST 调用的响应.

I am creating a streaming REST end point, where response is Flux that needs to keep emitting messages from a BlockingQueue as a response to GET REST call.

我已经尝试过论坛和文档,但只能找到从可迭代集合或响应式数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例.

I have already tried forums and documentation and can only find Flux initiated from iterable collections or reactive data sources, but no examples from any BlockingQueue.

推荐答案

你可以试试 Flux#generate队列#peek.请记住,如果队列为空,peek 将返回 null,并且不能在 onNext 中使用.

You can try Flux#generate and Queue#peek. Just keep in mind that peek will return null if the queue is empty, and it cannot be used in onNext.

类似于:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});

还有 Flux#repeatWhen 操作符,以防您想在队列被视为空后重新订阅队列,例如与:

There is also Flux#repeatWhen operator, in case you want to re-subscribe to the queue after it was considered empty, e.g. with:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))

这篇关于如何从阻塞队列创建反应堆 Flux?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆