使用 repeatWhen() 的动态延迟值 [英] Dynamic delay value with repeatWhen()

查看:25
本文介绍了使用 repeatWhen() 的动态延迟值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在我正在用 RxJava 实现一些轮询逻辑.我应该多次轮询端点,直到它告诉我停止.此外,每个响应都会返回一个我应该在再次轮询之前延迟的时间.我的逻辑现在看起来像这样:

Right now I'm implementing some polling logic with RxJava. I'm supposed to poll an endpoint a number of times until it tells me to stop. Additionally, each response comes back with a time that I'm supposed to delay by before polling again. My logic looks something like this right now:

service.pollEndpoint()
    .repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS))
    .takeUntil(Blah::shouldStopPolling);

现在我将延迟值硬编码为 5000,但我希望它取决于轮询响应中的值.我尝试使用返回 Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS)) 的平面图,但这似乎不像正确的想法,因为它弄乱了源 Observable.我觉得这是我忽略的简单事情.谢谢!

Right now I have the delay value hardcoded to 5000, but I'd like it to depend on a value in the poll response. I tried using a flatmap that returned Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS)), but that didn't seem like the right idea since it messed with the source Observable. I feel like it's something simple that I'm overlooking. Thanks!

推荐答案

正如@JohnWowUs 提到的,您需要带外通信,但如果您多次订阅该序列,则可以使用 defer 具有每个订阅者的状态:

As @JohnWowUs mentioned, you need out-of-band communication, but if you subscribe to the sequence more than once, you can use defer to have per-subscriber state:

Observable.defer(() -> {
    int[] pollDelay = { 0 };
    return service.pollEndpoint()
    .doOnNext(response -> pollDelay[0] = response.getDelay())
    .repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS)))
    .takeUntil(Blah::shouldStopPolling);
});

这篇关于使用 repeatWhen() 的动态延迟值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆