使用 repeatWhen() 的动态延迟值 [英] Dynamic delay value with repeatWhen()
问题描述
现在我正在用 RxJava 实现一些轮询逻辑.我应该多次轮询端点,直到它告诉我停止.此外,每个响应都会返回一个我应该在再次轮询之前延迟的时间.我的逻辑现在看起来像这样:
Right now I'm implementing some polling logic with RxJava. I'm supposed to poll an endpoint a number of times until it tells me to stop. Additionally, each response comes back with a time that I'm supposed to delay by before polling again. My logic looks something like this right now:
service.pollEndpoint()
.repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS))
.takeUntil(Blah::shouldStopPolling);
现在我将延迟值硬编码为 5000,但我希望它取决于轮询响应中的值.我尝试使用返回 Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS))
的平面图,但这似乎不像正确的想法,因为它弄乱了源 Observable.我觉得这是我忽略的简单事情.谢谢!
Right now I have the delay value hardcoded to 5000, but I'd like it to depend on a value in the poll response. I tried using a flatmap that returned Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS))
, but that didn't seem like the right idea since it messed with the source Observable. I feel like it's something simple that I'm overlooking. Thanks!
推荐答案
正如@JohnWowUs 提到的,您需要带外通信,但如果您多次订阅该序列,则可以使用 defer
具有每个订阅者的状态:
As @JohnWowUs mentioned, you need out-of-band communication, but if you subscribe to the sequence more than once, you can use defer
to have per-subscriber state:
Observable.defer(() -> {
int[] pollDelay = { 0 };
return service.pollEndpoint()
.doOnNext(response -> pollDelay[0] = response.getDelay())
.repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS)))
.takeUntil(Blah::shouldStopPolling);
});
这篇关于使用 repeatWhen() 的动态延迟值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!