Spring WebFlux(Flux):如何动态发布 [英] Spring WebFlux (Flux): how to publish dynamically

查看:13
本文介绍了Spring WebFlux(Flux):如何动态发布的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对反应式编程和Spring WebFlux不熟悉。我想让我的应用程序1发布服务器通过Flux发送事件,并且我的应用程序2不断监听它。

我想让Flux按需发布(例如,当发生某些事情时)。我发现的所有示例都是使用Flos.Interval定期发布事件,而且似乎无法在创建后添加/修改Flux中的内容。

我如何实现我的目标?或者我在概念上完全错了。

推荐答案

使用FluxProcessorFluxSink动态发布&q;

Flux手动提供数据的技术之一是使用以下示例中的FluxProcessor#sink方法

@SpringBootApplication
@RestController
public class DemoApplication {

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        sink.next("Hello World #" + counter.getAndIncrement());
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return processor.map(e -> ServerSentEvent.builder(e).build());
    }
}
在这里,我创建了DirectProcessor,以便支持多个将监听数据流的订阅者。此外,我还提供了额外的FluxProcessor#serialize,它们为多生产者提供了安全支持(从不同的线程调用,而不违反反应流规范规则,尤其是rule 1.3)。最后,通过调用";http://localhost:8080/send";,我们将看到消息Hello World #1(当然,只有在您之前连接到";http://localhost:8080";的情况下)

Reector 3.4更新

在Reator 3.4中,您有一个名为reactor.core.publisher.Sinks的新API。SinksAPI为手动发送数据提供了一个流畅的构建器,允许您指定流中元素的数量和反压行为、支持的订阅者数量和重放能力:

@SpringBootApplication
@RestController
public class DemoApplication {

    final Sinks.Many sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.sink = Sinks.many().multicast().onBackpressureBuffer();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());

        if (result.isFailure()) {
          // do something here, since emission failed
        }
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
    }
}

注意,通过Sinks接口发送消息引入了emission的新概念及其结果。之所以有这样的API,是因为反应器延伸了反应流,并且必须遵循背压控制。也就是说,如果您的emit比请求的信号多,并且底层实现不支持缓冲,则您的消息将不会被传递。因此,tryEmitNext的结果返回EmitResult,表示消息是否已发送。

还请注意,在默认情况下,SinskAPI提供Sink的序列化版本,这意味着您不必关心并发性。但是,如果您事先知道消息的发送是连续的,则可以构建一个Sinks.unsafe()版本,该版本不会序列化给定的消息

这篇关于Spring WebFlux(Flux):如何动态发布的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆