Project Reactor,在创建 lambda 之外使用 Flux sink [英] Project Reactor, using a Flux sink outside of the creation lambda
本文介绍了Project Reactor,在创建 lambda 之外使用 Flux sink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
- 当我的服务启动时,我想构建一个简单的管道.
- 我想隔离 Flux 接收器或处理器来发出事件.
- 事件将来自多个线程,应根据管道的
subscribeOn()
规范进行处理,但一切似乎都在main
线程上运行.立> - 最好的方法是什么?我在下面附上了我的尝试.
- (我使用的是 reactor-core v3.2.8.RELEASE.)
- When my service starts up, I want to construct a simple pipeline.
- I'd like to isolate the Flux sink, or a Processor, to emit events with.
- Events will be coming in from multiple threads and should be processed according to the pipeline's
subscribeOn()
specification, but everything seems to run on themain
thread. - What is the best approach? I've attached my attempts below.
- (I'm using reactor-core v3.2.8.RELEASE.)
import org.junit.jupiter.api.Test;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
/**
* I want to construct my React pipelines during creation,
* then emit events over the lifetime of my services.
*/
public class React1Test
{
/**
* Attempt 1 - use a DirectProcessor and send items to it.
* Doesn't work though - seems to always run on the main thread.
*/
@Test
public void testReact1() throws InterruptedException
{
// Create the flux and sink.
FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
FluxSink<String> sink = fluxProcessor.sink();
// Create the pipeline.
fluxProcessor
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribeOn(Schedulers.elastic())
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
// Give the multi-thread pipeline a second.
Thread.sleep(1000);
// Time passes ... things happen ...
// Pass a few messages to the sink, emulating events.
sink.next("a");
sink.next("b");
sink.next("c");
// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}
// Used down below during Flux.create().
private FluxSink<String> sink2;
/**
* Attempt 2 - use Flux.create() and its FluxSink object.
* Also seems to always run on the main thread.
*/
@Test
public void testReact2() throws InterruptedException
{
// Create the flux and sink.
Flux.<String>create(sink -> sink2 = sink)
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribeOn(Schedulers.elastic())
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
// Give the multi-thread pipeline a second.
Thread.sleep(1000);
// Pass a few messages to the sink.
sink2.next("a");
sink2.next("b");
sink2.next("c");
// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}
// Show us what thread we're on.
private static void showDebugMsg(String msg)
{
System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
}
}
输出总是:
a [main]
a [main]
b [main]
b [main]
c [main]
c [main]
但我所期望的是:
a [elastic-1]
a [elastic-1]
b [elastic-2]
b [elastic-2]
c [elastic-3]
c [elastic-3]
提前致谢.
推荐答案
您看到 [main]
因为您正在从主线程调用 onNext
.您使用的 subscribeOn
仅用于订阅(当 create
的 lambda 被触发时).如果您使用 publishOn
而不是 subscribeOn
,您将看到记录的 elastic-*
线程.
You see [main]
because you're calling onNext
from the main thread.
subscribeOn
you're using is only for the subscription (when create
's lambda is triggered).
You will see elastic-*
threads logged if you use publishOn
instead of subscribeOn
.
另外,考虑使用处理器,存储sink<不鼓励从
Flux.create
和类似的操作符中获取/code> 作为字段.
Also, consider using Processors, storing sink
obtained from Flux.create
and similar operators as a field is discouraged.
这篇关于Project Reactor,在创建 lambda 之外使用 Flux sink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文