使用ReactiveX for Java进行Http调用 [英] Make Http call using ReactiveX for Java

查看:115
本文介绍了使用ReactiveX for Java进行Http调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是ReactiveX for Java的新手,我有以下代码块可以进行外部http调用,但它不是异步的.我们正在使用rxjava 1.2和Java 1.8

I am new to ReactiveX for Java and I've the following code block that make external http call but it is not async. We are using rxjava 1.2, and Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

我在网上找到了以下代码块,但我无法完全理解它,以及如何将其应用于代码库.

I've the following code block I found online but I couldn't totally understand it and how I can apply it to my code base.

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }

推荐答案

如果我对您的理解正确,则需要类似这样的东西来包装现有的callExternalUrl

If I understand you correctly, you need something like this to wrap your existing callExternalUrl

static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

代码的简短描述:

  1. 它计划在Schedulers.io
  2. 上执行现有的callExternalUrl
  3. ResponseEntity<T>最小化为成功的T和错误情况.它也发生在io调度程序上,但并不重要,因为它确实很短. (如果callExternalUrl内部存在异常,则按原样传递.)
  4. 让订户使用要在Schedulers.computation
  5. 上执行的结果
  1. It schedules execution of the existing callExternalUrl on the Schedulers.io
  2. Does minimal transformation of ResponseEntity<T> into successful T and error cases. It happens on the io scheduler as well but it is not important as it is really short. (If there was an exception inside callExternalUrl, it is passed as is.)
  3. Makes subscriber to the result to be executed on Schedulers.computation

注意事项:

  1. 您可能要对subscribeOnobserveOn
  2. 使用自定义计划程序
  3. 您可能希望在传递给flatMap的第一个lambda中具有更好的逻辑,以区分成功和错误,并且绝对希望有一些更具体的异常类型.
  1. You probably want to use your custom schedulers for both subscribeOn and observeOn
  2. You probably want to have some better logic in the first lambda passed to flatMap to distinguish between success and error and definitely you want some more specific exception type.

高阶魔术

如果您愿意使用高阶函数并为减少重复代码而牺牲一点性能,则可以执行以下操作:

If you are willing to use higher-order functions and trade a little bit of performance for less code duplication you can do something like this:

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

MyClass所在的位置,而callExternalUrl所在的位置.

Where MyClass is wherever your callExternalUrl is.

更新(仅限异步通话)

私有静态RxClient httpClient = Rx.newClient(RxObservableInvoker.class);//在这里,您可以通过自定义ExecutorService

private static RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // here you might pass custom ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

再次使用类似的:

  1. 您可能要对newClient通话和observeOn
  2. 使用自定义计划程序
  3. 您可能希望有一些更好的逻辑来进行错误处理,而不仅仅是检查它是否为HTTP 200,并且肯定需要一些更具体的异常类型.但这都是特定于业务逻辑的,因此由您决定.
  1. You probably want to use your custom schedulers for both newClient call and observeOn
  2. You probably want to have some better logic for error handling than just checking whether it is HTTP 200 or not and definitely you want some more specific exception type. But this is all business-logic specific so it is up to you.

此外,从您的示例中还不清楚请求主体(HttpEntity)是如何构建的,以及您是否始终像在原始示例中一样总是希望String作为响应.我还是照原样复制了您的逻辑.如果您还需要其他内容,则可能应该参考 https://jersey上的文档. java.net/documentation/2.25/media.html#json

Also it is not clear from your example how exactly the body of the request (HttpEntity) is build and whether you actually always want String as a response as it is in your original example. Still I just replicated your logic as is. If you need something more you probably should refer to the documentation at https://jersey.java.net/documentation/2.25/media.html#json

这篇关于使用ReactiveX for Java进行Http调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆