rx-java2相关内容

RxJava -2 Observables 随时接受更多 Observables?

我目前正在使用 rx-java 2 并且有一个用例,其中单个 Camel Route 订阅者需要使用多个 Observable.使用此解决方案作为参考,我有一个部分可行的解决方案.RxJava - 合并的 Observable 随时接受更多的 Observable? 我打算使用一个 PublishProcessor,它将订阅一个骆驼反应流订阅者,然后维护一个 ConcurrentHashSe ..
发布时间:2021-11-11 23:00:30 Java开发

在Android中,在一个api调用中制作一个api调用列表

在我的一个 android 应用程序中,首先我想调用一个 api,它将返回一个项目列表,该项目将显示在 RecyclerView 中.我还需要为 RecyclerView 的每个项目调用另一个 api 以获取该项目的描述并根据它们的 id 显示每个项目的描述.我该如何解决这种情况. Subject.kt 数据类主题(val 主题:字符串,val 主题代码:字符串,val 主题图标:字符串, ..
发布时间:2021-11-02 23:05:40 移动开发

在 Observables 中包装事件监听器

我看过很多关于如何将有限的事物(如数组或 Iterables)转换为 Observables 的例子,但我不确定我是否理解如何将Observable 用像事件接收器这样的活动且有效无界的东西制作出来.我研究了 RxJava2 文档并提出了这个,以 Android LocationListener 为例. 是否有更简单和/或更正确的方法来做到这一点?我知道“RxBus"概念,但这似乎是一种坚持 ..
发布时间:2021-07-14 19:00:21 其他开发

使用 RXJava2/RXAndroid 2 和 Retrofit 进行轮询

我想实现一个轮询服务,它每 nDelay 秒调用一个 REST Api,并在数据发生更改时通知所有订阅者.现在我的代码有一个小问题,因为它总是向我的消费者返回一个值,即使数据没有改变. private Observable>pollingLightsObservable = null;公共 Observable>getPollingLightsObservable() {如果(pollingLi ..
发布时间:2021-07-14 19:00:18 Java开发

取消订阅 Single 的正确方法是什么

我想在短暂的延迟后做点什么: public void notifyMe() {单身的.timer(500, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).subscribe(ignore -> service.notifyMe()));} 现在我有一个警告:“订阅的结果没有被使用".我该如何解决? 解决方案 A Single 只 ..
发布时间:2021-07-14 19:00:15 Java开发

RxJava 的 retryWhen 操作符

我正在尝试深入了解 retryWhen 运算符,我有一些代码如下. Flowable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).retryWhen { throwable ->Log.d(“调试",“重试时继续...")throw Exception("有 ..
发布时间:2021-07-14 19:00:10 移动开发

Rxjava 链接多个请求

我是 RxJava 概念的新手.我想链接一些调用: ObservablelistRoomsCall = mRoomServiceApi.listRooms(); //这个电话会得到我的RoomIds下一步是调用所有 RoomIds - 请求后请求 mMeetingServiceApi.listMeetings(roomID,startsAtString,endsAtString,free)) ..
发布时间:2021-07-14 19:00:07 Java开发

RxJava2 - 同步执行调用

我有一个 TestService,我在其中执行异步任务来获取我的数据.我想在继续之前等待回复. public List获取数据(){列表数据 = 新的 ArrayList();一次性一次性 = repository.getDataFromApi(false).observeOn(AndroidSchedulers.mainThread()).subscribe(newData -> {data. ..
发布时间:2021-07-14 19:00:05 移动开发

在 RxJava2 中处理 null

我需要从本地或远程加载数据.如果获取本地数据,则不会获取远程数据. 由于本地数据可以为空(即空),我必须在 RxJava2 中处理 null 的情况,所以我在 Optional util.这是代码. 字符串数据;可观察的>加载缓存数据(){返回 Observable.create(发射器 -> {发射器.onNext(Optional.ofNullable(null)); ..
发布时间:2021-07-14 18:59:49 其他开发

Axon 4:从不同线程应用事件时未触发 EventSourcingHandler

我在 Axon 4 中的命令处理中遇到了一个小问题. 假设我有一个在处理命令时需要调用外部服务的聚合. 外部服务使用异步客户端(vertx tcp 客户端 + rxjava),因此响应在与创建聚合实例的线程不同的线程中给出. 我想根据我的服务结果应用一个事件,但它不起作用,因为 AggregateLifecycle.apply() 调用在不同的线程上... 如何“转移"聚合 ..
发布时间:2021-07-14 18:59:46 其他开发

如何让 Vertx Reactive RecordParser 发出没有长度的消息?

我有一个反应式 vertx TCPserver,它使用 RecordParser 来处理带有 2 字节长度标头的消息.RecordParser 负责理解长度部分和消息部分. 问题 1 recordparser 正确解释了记录解析器中的数据和系统输出.但是当我添加以下行时,系统输出不再打印.它可能与它在不同的线程上执行有关吗?我不确定. frameParser.toFlowable().s ..
发布时间:2021-07-14 18:59:43 Java开发

RxJava - “一次只允许一个发射沿着 Observable 链向上传播......";

我正在阅读一篇博文:http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html 据说是哪里 无论你订阅的​​是什么调度器,只有一次发射是允许一次沿着 Observable 操作符链向上移动.在下面,您可以观察到必须一直推动发射在下一次发射开始之前从源到订阅者. 在引用文 ..
发布时间:2021-07-14 18:59:40 其他开发

处理后调用 Rx Observable onCompleted

我在我的 Android 应用中使用 RxJava2.我有一个 Observable,它在创建时被添加到 CompositeDisposable.这个 Observable 正在从一些 API 检索数据,它可能需要很长时间才能完成. 当我在下载时离开页面时,在 onPause 中,我正在处理这个 CompositeDisposable,但有时 Observable 的onComplete 仍 ..
发布时间:2021-07-14 18:59:34 移动开发

仅使用 onBackpressureLatest() 消耗最新项目

我有一个定期发出项目的生产者和一个有时很慢的消费者.重要的是,消费者只使用最近的项目.我认为 onBackpressureLatest() 是这个问题的完美解决方案.于是我写了下面的测试代码: PublishProcessor源 = PublishProcessor.create();来源.onBackpressureLatest().observeOn(Schedulers.from(Exec ..
发布时间:2021-07-14 18:59:30 Java开发

RxJava doOnError 与 onError

我尝试使用以下代码 initLocalSettingsIfNeed().andThen(initGlobalSettingsIfNeed(configuration)).doOnComplete(回调::onSuccess).doOnError(throwable -> callback.onError(throwable.getLocalizedMessage())).订阅(); 但我有例外 ..
发布时间:2021-07-14 18:59:27 移动开发

为什么不调用 doOnDispose?

当像这样创建一个 Observable 时: public void foo() {Observable observable = Observable.fromCallable(() -> {酒吧();返回 "";}).doOnSubscribe(一次性 -> System.out.println("onSubscribe")).doOnDispose(() -> System.out.pri ..
发布时间:2021-07-14 18:59:20 Java开发

RxJava 2:总是取消订阅 .subscribeOn(..) 调度程序?

我有一个 Observable 来执行一些工作.完成后,它关闭其连接(通过 .setDisposable(Disposables.fromAction(logic*);).问题是,我需要在与实际工作负载相同的调度程序上执行此关闭操作. 有可能吗?怎么样? 我不想强制 Observable 在给定的调度器上执行,例如 myObservable.subscribeOn(x).unsubsc ..
发布时间:2021-07-14 18:59:11 移动开发

如何模拟 EntityBus.rxSend()

io.vertx.reactivex.core.eventbus.EventBus.rxSend() 方法具有以下签名: public 单个>rxSend(字符串地址,对象消息,DeliveryOptions 选项) 模拟它以返回包含真实对象的 Single 的正确方法是什么?问题是 Message 类除了接受另一个 Message 对象的构造函数之外没有其他构造函数.因此将编译以 ..
发布时间:2021-07-14 18:59:02 其他开发

rx-java2 flatMap 中的空处理

如文档中所述RxJava 2.x 不再接受空值.因此,以下两行都以 onError 调用终止也就不足为奇了: Observable.fromCallable(() -> null);Observable.just(1).flatMap(i -> Observable.error(new RuntimeException())); 不清楚的是为什么 Observable.just(1).fla ..
发布时间:2021-07-14 18:58:56 Java开发

可以在 rxjava 中转换以下代码吗

例如,我有以下可运行的 Java 代码. 它是关于一个生产者和几个并行的消费者.这些消费者正在运行耗时的作业,并且它们是并行运行的. 我想知道这个用例是否匹配 rx-java,以及如何在 rx-java 中重写它. public class DemoInJava {公共静态无效主(字符串 [] args){final BlockingQueuequeue = new LinkedBl ..
发布时间:2021-07-14 18:58:51 其他开发