RxJava.顺序执行 [英] RxJava. Sequential execution

查看:235
本文介绍了RxJava.顺序执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的Android应用中,我有一个演示者来处理用户交互,包含一种请求管理器,并在需要时通过请求管理器将用户输入发送给请求管理器.

In my Android App I have a presenter which handles user interactions, contains kind of request manager and if needed sends user input over request manager to request manager.

请求管理器本身包含服务器API,并使用此RxJava处理服务器请求.

Request manager itself contains server API and handles server request using this RxJava.

我有一个代码,每次用户输入消息并显示服务器的响应时,该代码就会向服务器发送请求:

I have a code, which sends a request to server everytime a user enters a message and show the response from server:

private Observable<List<Answer>> sendRequest(String request) {
    MyRequest request = new MyRequest();
    request.setInput(request);
    return Observable.fromCallable(() -> serverApi.process(request))
            .doOnNext(myResponse -> {
                // store some data
            })
            .map(MyResponse::getAnswers)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
}

但是现在我需要排队.用户可以在服务器响应之前发送新消息.队列中的每个消息都应按顺序处理.IE.我们将在收到第一条消息的答复后发送第二条消息,依此类推.

However now I need to have kind of queue. The user may send a new message before the server has responded. Each message from the queue should be processed sequentially. I.e. the second message will be sent after we've got a response to the first message and so on.

万一发生错误,则不应再处理其他请求.

In case an error occurs no further requests should be handled.

我还需要在RecyclerView中显示答案.

I also need to display the answers within a RecyclerView.

我不知道如何更改上面的代码以实现上述处理

I have no idea how to change the code above to achieve the handling described above

我看到了某种问题.一方面,该队列可以由用户随时更新,另一方面,只要服务器发送响应,则应从队列中删除消息.

I see kind of problem. On one hand, this queue can be anytime updated by the user, on the other hand anytime server sent a response the message should be removed from the queue.

也许是我错过了一个rxjava运算符或特殊方式.

Maybe there is a rxjava operator or special way I just missed.

我在这里看到了类似的答案,但是,那里的队列"是恒定的.使用RxJava和Retrofit进行N次连续api调用

I saw a similar answer here, however, the "queue" there is constant. Making N sequential api calls using RxJava and Retrofit

对于任何解决方案或链接,我将非常感谢

I'll be very thankful for any solution or link

推荐答案

我没有找到任何优雅的Native-RxJava解决方案.所以我将自定义 Subscriber 来完成您的工作.

I don't fnd any elegant native-RxJava solution. So I will custom a Subscriber to do your work.

您的3分:

  1. 对于顺序执行,我们创建一个单线程调度程序

  1. For sequential execution, we create a single thread scheduler

调度程序顺序= Schedulers.from(Executors.newFixedThreadPool(1));

要在发生错误时停止所有请求,我们应该一起订阅所有请求,而不是每次都创建一个 Flowable .因此,我们定义了以下函数(我在这里请求的是 Integer 和响应的 String ):

For stop all requests when error occur, we should subscribe all request together instead of create a Flowable every time. So we define following functions (here I request is Integer and response String):

void sendRequest(Integer request)

Flowable< String>reciveResponse()

并定义一个字段以使请求和响应流相关联:

and define a field to make association of request and response flow:

FlowableProcessor< Integer>requestQueue = UnicastProcessor.create();

为重新运行未发送的请求,我们定义了重新运行功能:

For re-run the not-sent request, we define the rerun function:

void rerun()

然后我们可以使用它:

reciveResponse().subscribe(/**your subscriber**/)

现在让我们实现它们.

发送请求时,我们只需将其推送到 requestQueue

When send request, we simply push it into requestQueue

public void sendRequest(Integer request) {
  requestQueue.onNext(request);
}

首先,要按顺序执行请求,我们应该将工作安排在 sequential :

First, to do the request sequentialy, we should schedule work to sequential:

requestQueue
  .observeOn(sequential)
  .map(i -> mockLongTimeRequest(i)) // mock for your serverApi.process
  .observeOn(AndroidSchedulers.mainThread());

第二,在发生错误时停止请求.这是默认行为.如果我们什么也不做,则错误将导致订阅中断,并且不会再发送任何其他项目.

Second, to stop request when error occur. It's a default behavior. If we do nothing, an error will broken the subscription and any futher items will not be emitted.

第三,重新运行未发送的请求.首先是因为本机运算符将取消流,就像 MapSubscriber 那样(RxJava-2.1.0-FlowableMap#63):

Third, to re-run the not-sent requests. First because that the native operator will cancel the stream, like MapSubscriber do (RxJava-2.1.0-FlowableMap#63):

try {
    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
    fail(ex);// fail will call cancel
    return;
}

我们应该包装错误.在这里,我使用了

We should wrap the error. Here I use my Try class to wrap the possible exception, you can use any other implementation that can wrap the exception instead of throw it:

    .map(i -> Try.to(() -> mockLongTimeRequest(i)))

然后是自定义的 OnErrorStopSubscriber实现了Subscriber< Try< T>>" Subscription .

它请求并正常发射物品.当发生错误时(实际上是发出失败的 Try 的错误),它在那里停止并且不会请求甚至不向下游发出请求.调用 rerun 方法后,它将返回运行状态并正常发射.全班大约有80行.您可以在我的代码上看到该代码github .

It request and emits items normally. When error occur(in fact is a failed Try emitted) it stopped there and won't request or emit even downstream request it. After call rerun method, it will back to the running statu and emit normally. The class is about 80 lines. You can see the code on my github.

现在我们可以测试我们的代码了:

Now we can test our code:

public static void main(String[] args) throws InterruptedException {
  Q47264933 q = new Q47264933();
  IntStream.range(1, 10).forEach(i -> q.sendRequest(i));// emit 1 to 10
  q.reciveResponse().subscribe(e -> System.out.println("\tdo for: " + e));
  Thread.sleep(10000);
  q.rerun(); // re-run after 10s
  Thread.sleep(10000);// wait for it complete because the worker thread is deamon
}

private String mockLongTimeRequest(int i) {
  Thread.sleep((long) (1000 * Math.random()));
  if (i == 5) {
    throw new RuntimeException(); // error occur when request 5
  }
  return Integer.toString(i);
}

并输出:

1 start at:129
1 done  at:948
2 start at:950
    do for: 1
2 done  at:1383
3 start at:1383
    do for: 2
3 done  at:1778
4 start at:1778
    do for: 3
4 done  at:2397
5 start at:2397
    do for: 4
error happen: java.lang.RuntimeException
6 start at:10129
6 done  at:10253
7 start at:10253
    do for: 6
7 done  at:10415
8 start at:10415
    do for: 7
8 done  at:10874
9 start at:10874
    do for: 8
9 done  at:11544
    do for: 9

您可以看到它按顺序运行.并在发生错误时停止.调用 rerun 方法后,它将继续处理左侧的未发送请求.

You can see it runs sequentialy. And stopped when error occur. After call rerun method, it continue handle the left not-sent request.

有关完整代码,参见我的github.

这篇关于RxJava.顺序执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆