将 RxJava 1.1.5 适配到 Reactor Core 3.1.0.M3 [英] Adapting RxJava 1.1.5 to Reactor Core 3.1.0.M3

查看:51
本文介绍了将 RxJava 1.1.5 适配到 Reactor Core 3.1.0.M3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用一个使用 RxJava 1.1.5 和 Spring WebFlux(即 Reactor Core 3.1.0.M3)的库,但我无法将 Observable 适配到 Flux.

我认为这相对简单,但我的适配器不工作:

import reactor.core.publisher.Flux;进口 rx.Observable;导入 rx.Subscriber;导入 rx.Subscription;公共静态<T>通量 TtoFlux(Observable<T> observable) {返回 Flux.create(发射器 -> {最终订阅订阅 = observable.subscribe(new Subscriber() {@覆盖公共无效onNext(T值){发射器.下一个(值);}@覆盖公共无效 onCompleted() {发射器.完成();}@覆盖public void onError(Throwable throwable) {发射器.错误(可抛出);}});发射器.onDispose(订阅::取消订阅);});}

我已经确认 onNextonCompleted 都以正确的顺序被调用,但我的 Flux 始终为空.有没有人看到我做错了什么?

在相关说明中,为什么 reactor-addons?

解决方案

使用 RxJavaReactiveStreams 适配器转你的 Observable 变成一个 Publisher,然后让 Flux.fromPublisher() 消费它.

编译'io.reactivex:rxjava-reactive-streams:1.2.1'可观察的○ = ...Flux.from(RxReactiveStreams.toPublisher(o));

<块引用>

在相关说明中,为什么在 reactor-addons 中没有适用于 RxJava 1 的适配器?

他们不想支持或鼓励使用旧技术,我完全同意.

I am trying to use a library that uses RxJava 1.1.5 with Spring WebFlux (i.e. Reactor Core 3.1.0.M3) but I am having trouble adapting Observable to Flux.

I thought this would be relatively straightforward, but my adapter isn't working:

import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public static <T> Flux<T> toFlux(Observable<T> observable) {
    return Flux.create(emitter -> {
        final Subscription subscription = observable.subscribe(new Subscriber<T>() {
            @Override
            public void onNext(T value) {
                emitter.next(value);
            }
            @Override
            public void onCompleted() {
                emitter.complete();
            }
            @Override
            public void onError(Throwable throwable) {
                emitter.error(throwable);
            }
        });
        emitter.onDispose(subscription::unsubscribe);
    });
}

I have verified that onNext and onCompleted are both getting called in the correct order but my Flux is always empty. Does anyone see what I am doing wrong?

On a related note, why isn't there an adapter for RxJava 1 in reactor-addons?

解决方案

Use the RxJavaReactiveStreams adapter to turn your Observable into a Publisher, then have Flux.fromPublisher() consume it.

compile 'io.reactivex:rxjava-reactive-streams:1.2.1'

Observable<T> o = ...

Flux.from(RxReactiveStreams.toPublisher(o));

On a related note, why isn't there an adapter for RxJava 1 in reactor-addons?

They don't want to support or encourage using that old technology and I completely agree.

这篇关于将 RxJava 1.1.5 适配到 Reactor Core 3.1.0.M3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆