I am using the Rx-ified API for vertx, and this question should make a potentially endless retry-until-success loop that I would like to implement, but I have difficulties. I am new to RxJava.
Here is what I would like to do:
- Send the request to another vertx component using the message vertx bus
- While I get a timeout waiting for a response, retry the request
- As soon as I get a response to the request, check the results, and if there is nothing useful, wait a while, and then start all over again in step 1)
First problem
The first problem I am facing is how to perform step 2).
If you are familiar with vert.x Rx api, this means that you must complete the request in step 1) above:
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject );
The above code returns an Observable instance that will either return a response or an error (for example, if there was a timeout). This Observable will never emit anything again (otherwise it will always emit the exact same thing every time something is signed, I donβt know what).
RxJava repeat statement does not work
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) .retry()
I thought that in order to release a repeat, I could just use the RxJava retry () operator, which I tried, but which has nothing to do with the nature of the observed source. No new request message is sent, because the only thing that is "repeated" is a subscription to the original source, which will never emit something else.
RxJava retryWhen the statement does not work
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) .retryWhen( error -> { return _vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) })
So, I thought I could use the RxJava retryWhen () operator, which allows me to return the second observable when the root observable emits an error. The second observable could, I thought, simply be the same code above as the original observer in the first stage.
But the retryWhen () operator ( see the documentation ) does not allow this second observable to emit an error without ending the subscription with an error.
So, it's hard for me to figure out how to set up a potentially infinite loop of repetition in the first part of this chain.
Something is missing for me here, but I could not determine what it is.
Second problem
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) // imagine that retryWhen() accomplishes an infinite retry .retryWhen( error -> { return _vertx.eventBus().<JsonObject>sendObservable( ... ) }) .flatMap( response -> { // inspect response, if it has usable data, // return that data as an observable return Observable.from(response.data()); // if the response has no usable data, // wait for some time, then start the whole process // all over again return Observable.timer(timeToWait).<WHAT GOES HERE?>; })
The second problem is how to implement step 3. This seems to me to be the first problem, itβs only harder to understand, because I donβt need to repeat the direct source observed, I need to wait a bit, then start from step 1).
Whatever the observation I create, all the elements in the chain leading to this point will be needed, which is similar to recursion, which should probably be avoided.
Any help or suggestions would really be appreciated at this point.