Make an Http call using ReactiveX for Java

I am new to ReactiveX for Java, and I have the following block of code that makes an external HTTP call, but it is not async. We use rxjava 1.2, and Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

I have the following code that I found on the Internet, but I could not fully understand it and how can I apply it to my code base.

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }
+4
source share
1 answer

If you understand correctly, you need something like this to wrap an existing callExternalUrl

static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

Short code description:

  • He is planning to fulfillment of existing callExternalUrlonSchedulers.io
  • ResponseEntity<T> T . io, , . ( callExternalUrl , .)
  • , Schedulers.computation

  • , subscribeOn observeOn
  • , , flatMap, , .

, - :

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

MyClass callExternalUrl.


( Async)

RxClient httpClient = Rx.newClient(RxObservableInvoker.class);// ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

:

  • , , newClient observeOn
  • , , , HTTP 200 , . -, .

, , (HttpEntity), String , . , . - , , , https://jersey.java.net/documentation/2.25/media.html#json

+3

Source: https://habr.com/ru/post/1670795/


All Articles