Socket watchdog in rx-java

I'm currently trying to implement tcp watchdog / retry system using rx, your help would be greatly appreciated.

Having an observable, I would like to have an Observable as a result of periodically checking if we can still write to the socket. Easy enough, I can do something like this:

class SocketSubscribeFunc implements Observable.OnSubscribeFunc<Socket> {
  private final String hostname;
  private final int port;
  private Socket socket;

  SocketSubscribeFunc(String hostname, int port) {
    this.hostname = hostname;
    this.port = port;
  }

  public Subscription onSubscribe(final Observer<? super Socket> observer) {
    try {
      log.debug("Trying to connect...");
      socket = new Socket(hostname, port);
      observer.onNext(socket);
    } catch (IOException e) {
      observer.onError(e);
    }
    return new Subscription() {
      public void unsubscribe() {
        try {
          socket.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    };
  }
}

Observable<Socket> socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port));
Observable<Boolean> watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2<Socket, Long, Boolean>() {

  public Boolean call(final Socket socket, final Long aLong) {
    try {
      socket.getOutputStream().write("ping\n".getBytes());
      return true;
    } catch (IOException e) {
     return false;
    }
  }
});

, (/ ) (/ ). Observable, OnSubscribeFunc . , Watchervables. switchMap/..., .

, . :)

!

+4
1

-, Observable.create , . Rx using, , Observable. , , . Java8 lambdas, .

Observable.using(
    // Resource (socket) factory
    () -> {
      try {
        return new Socket(hostname, port);
      } catch (IOException e) {
        // Rx will propagate this as an onError event.
        throw new RuntimeException(e);
      }
    },
    // Observable factory
    (socket) -> {
      return Observable.interval(1, TimeUnit.SECONDS)
          .map((unusedTick) {
            try {
              socket.getOutputStream().write("ping\n".getBytes());
              return true;
            } catch (IOException e) {
              throw new RuntimeException(e);
            }
          })
          // Retry the inner job up to 3 times before propagating.
          .retry(3);
    },
    // Dispose action for socket.
    // In real life the close probably needs a try/catch.
    (socket) -> socket.close())
    // Retry the outer job up to 3 times.
    .retry(3)
    // If we propagate all errors, emit a 'false', signaling service is not available.
    .onErrorResumeNext(Observable.just(false));

, , ( 3 .) , , retryWhen. RuntimeException , , .

using docs: http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)

+6

Source: https://habr.com/ru/post/1530592/


All Articles