How to end chained HTTP requests in RxJava Vert.x?

How to end the query chain in Rx Vert.X?

HttpClient client = Vertx.vertx().createHttpClient(); HttpClientRequest request = client.request(HttpMethod.POST, "someURL") .putHeader("content-type", "application/x-www-form-urlencoded") .putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData); request.toObservable(). //flatmap HttpClientResponse -> Observable<Buffer> flatMap(httpClientResponse -> { //something return httpClientResponse.toObservable(); }). map(buffer -> {return buffer.toString()}). //flatmap data -> Observable<HttpClientResponse> flatMap(postData -> client.request(HttpMethod.POST, someURL") .putHeader("content-type", "application/x-www-form-urlencoded") .putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()). //flatmap HttpClientResponse -> Observable<Buffer> flatMap(httpClientResponse -> { return httpClientResponse.toObservable(); })......//other operators request.end(); 

Please note that I have. end() for the top query. How to complete a request that is inside. flatmap ? Do I even need to finish it?

+5
source share
2 answers

I think you can do something like the following code.

The basic idea is that you are not directly using the HttpClientRequest received by the Vertx client. Instead, you create another current one that will call end() right after receiving the first subscription.

Here, for example, you can receive a request through a couple of custom methods: in this case request1() and request2() . They use doOnSubscribe() to run the end() that you need. Read its description on the ReactiveX page .

This example uses vertx and reactivex . I hope you can use this setting.

 import io.reactivex.Flowable; import io.vertx.core.http.HttpMethod; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.http.HttpClient; import io.vertx.reactivex.core.http.HttpClientRequest; import io.vertx.reactivex.core.http.HttpClientResponse; import org.junit.Test; public class StackOverflow { @Test public void test(){ Buffer jsonData = Buffer.buffer("..."); // the json data. HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client. request1(client) .flatMap(httpClientResponse -> httpClientResponse.toFlowable()) .map(buffer -> buffer.toString()) .flatMap(postData -> request2(client, postData) ) .forEach( httpResponse -> { // do something with returned data); }); } private Flowable<HttpClientResponse> request1(HttpClient client) { HttpClientRequest request = client.request(HttpMethod.POST,"someURL"); return request .toFlowable() .doOnSubscribe( subscription -> request.end() ); } private Flowable<HttpClientResponse> request2(HttpClient client, String postData) { HttpClientRequest request = client.request(HttpMethod.POST,"someURL"); // do something with postData return request .toFlowable() .doOnSubscribe( subscription -> request.end() ); } } 
+1
source

There are several ways to provide a call to request.end() . But I would delve into the Vert.x documentation or just open source if there is one to find out if it really calls end () for you. Otherwise one could

 final HttpClientRequest request = ... request.toObservable() .doOnUnsubscribe(new Action0() { @Override public void call() { request.end(); } }); 
+2
source

Source: https://habr.com/ru/post/1243353/


All Articles