Creating custom statements in RxJava2?

I'm having trouble finding an example of how to create a custom statement with RxJava 2. I looked at several approaches:

  • Using Observable.create and then flatMap on it from an observable source. I can make it work, but it's not quite right. I end up creating a static function that provides an Observable source and then flatMap on the source. In OnSubscribe, I then instantiate an object through which I pass the emitter, which processes and controls the Observable / Emitter (since this is not trivial, and I want everything to be encapsulated as possible).
  • Create an ObservableOperator and provide it with Observable.lift . I can’t find any examples of this for RxJava 2. I had to debug my own example to make sure my understanding of the upstream and downstream was correct. Since I cannot find any examples or documentation about this for RxJava 2, I am a little worried that I can accidentally do something that I should not.
  • Create your own type of Observable . It seems that the basic operators work, many of which extend AbstractObservableWithUpstream . However, there is a lot going on here, and it seems easy to miss something or do something that I shouldn't. I am not sure if I should use this approach or not. I went through a mental process, and it seems that he can quickly comb his hair.

I'm going to continue with option # 2, but thought it was worth asking if the supported method for this was in RxJava2, and also to find out if there is any documentation or examples for this.

+5
source share
2 answers

Write statements are not recommended for beginners, and many existing stream patterns can be achieved using existing statements.

Have you looked at the RxJava wiki about writing statements for 2.x ? I suggest reading it from top to bottom.

  • using create() possible, but most people use it to emit List items with a for-each loop, without acknowledging that Flowable.fromIterable does this.
  • We kept this extension point, although RxJava 2 operators do not use lift() themselves. If you want to avoid some pattern with option 3., you can try this route .
  • This is how the RxJava 2 operators are implemented. AbstractObservableWithUpstream is a small convenience and is not necessary for external developers .
+1
source

This can help you. I am implementing an RxJava2 statement to handle an APiError. I used the elevator operator.

See an example.

  public final class ApiClient implements ApiClientInterface { ... @NonNull @Override public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) { return myApiService.activate(email, emailData) .lift(getApiErrorTransformer()) .subscribeOn(Schedulers.io()); } private <T>ApiErrorOperator<T> getApiErrorTransformer() { return new ApiErrorOperator<>(gson, networkService); } } 

And then you can find the user statement

  public final class ApiErrorOperator<T> implements ObservableOperator<T, T> { private static final String TAG = "ApiErrorOperator"; private final Gson gson; private final NetworkService networkService; public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) { this.gson = gson; this.networkService = networkService; } @Override public Observer<? super T> apply(Observer<? super T> observer) throws Exception { return new Observer<T>() { @Override public void onSubscribe(Disposable d) { observer.onSubscribe(d); } @Override public void onNext(T value) { observer.onNext(value); } @Override public void onError(Throwable e) { Log.e(TAG, "onError", e); if (e instanceof HttpException) { try { HttpException error = (HttpException) e; Response response = error.response(); String errorBody = response.errorBody().string(); ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class); ApiException exception = new ApiException(errorResponse, response); observer.onError(exception); } catch (IOException exception) { observer.onError(exception); } } else if (!networkService.isNetworkAvailable()) { observer.onError(new NetworkException(ErrorResponse.builder() .setErrorCode("") .setDescription("No Network Connection Error") .build())); } else { observer.onError(e); } } @Override public void onComplete() { observer.onComplete(); } }; } } 
+1
source

Source: https://habr.com/ru/post/1263275/


All Articles