从 EmitterProcessor 移动到 Sinks.many() [英] Move from EmitterProcessor to Sinks.many()

查看:409
本文介绍了从 EmitterProcessor 移动到 Sinks.many()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一段时间以来一直使用 create 一个 EmitterProcessor 内置 sink 如下:

For some time have been using create an EmitterProcessor with built in sink as follows:

EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

sink 使用 Flux .from 命令发布

Flux<String> out = Flux
        .from(emitter
        .log(log.getName()));

sink 可以传递,并用字符串填充,只需使用 next 指令即可.

and the sink can be passed around, and populated with strings, simply using the next instruction.

现在我们看到 EmitterProcessor 已被弃用.

Now we see that EmitterProcessor is deprecated.

全部替换为 Sinks.many()像这样

It's all replaced with Sinks.many() like this

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

但是如何使用它来发布from?

but how to use that to publish from?

推荐答案

答案是将 Sinks.many() 转换为 asFlux()

Flux<String> out = Flux
        .from(sink.asFlux()
        .log(log.getName()));

也用于取消和终止通量

sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});
    
/* Handle errors, eviction, expiration */
sink.asFlux().doOnTerminate(() -> {
    disposeSink(id);
});   

UPDATE 根据 这个问题

这篇关于从 EmitterProcessor 移动到 Sinks.many()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆