How to intercept an observable object and modify it in RxJava before returning to the subscriber?

I'm currently trying to get into the service and return the list of objects before it is returned to the subscriber, I want to make another synchronous call for each object in the list, to make another call to the service to set the missing field. I successfully complete all the calls, but the object returned in the subscriber has this field, which I need to set to null. Here is an example of my code:

Service example:

rx.Observable<List<ExampleObject>> getExampleObject(); rx.Observable<MissingObject> getMissingObjectByFoo(@Path("foo") String foo); 

Example Class:

 public class ExampleObject { String foo; MissingObject bar; public String getFoo() { return this.foo; } public void setFoo(String value) { this.foo = value; } public MissingObject getBar() { return this.bar; } public void setBar(MissingObject value) { this.bar = value; } } 

Implementation Example:

 mService.getExampleObject().flatMap(new Func1<List<ExampleObject>, Observable<?>>() { @Override public Observable<List<ExampleObject>> call(List<ExampleObject> exampleObjects) { for (ExampleObject entry : exampleObjects) { String foo = entry.getFoo(); mService.getMissingObjectByFoo(foo) .subscribeOn(mScheduler.backgroundThread()) .observeOn(mScheduler.mainThread()) .subscribe(new Subscriber<MissingObject>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MissingObject missingObject) { entry.setBar(missingObject); } }); } return Observable.just(exampleObjects); }; 
+6
source share
2 answers

You are looking for a zip operator, as described here: Zip Operator . I think you want the flatmap to zip all your calls, so something like this:

  mService.getExampleObject().flatMap(new Func1<List<ExampleObject>, Observable<ExampleObject>>() { @Override public Observable<List<ExampleObject>> call(List<ExampleObject> exampleObjects) { List<Observable<ExampleObject>> allTheObservables = new ArrayList<Observable<ExampleObject>>(); for (ExampleObject entry : exampleObjects) { allTheObservables.add(mService.getMissingObjectByFoo(foo).map(new Func1<MissingObject, ExampleObject>() { @Override public ExampleObject call(MissingObject missingObject) { return entry.setBar(missingObject); } })); } return Observable.zip(allTheObservables, new FuncN<ExampleObject>() { @Override public ExampleObject call(ExampleObject... args) { return Arrays.asList(args); } }); } }); 

and in case this does not work or there are problems with the syntax, here is a specific example using the github api:

  service.getContributorsObservable("square", "dagger") .flatMap(new Func1<List<Contributor>, Observable<List<String>>>() { @Override public Observable<List<String>> call(List<Contributor> contributors) { List<Observable<String>> allTheObservables = new ArrayList<>(contributors.size()); for (final Contributor contributor : contributors) { allTheObservables.add(service.getContributorsObservable(contributor.login).map(new Func1<User, String>() { @Override public String call(User user) { return contributor.login + " is " + user.name; } })); } return Observable.zip(allTheObservables, new FuncN<List<String>>() { @Override public List<String> call(Object... args) { return Arrays.asList((String[]) args); } }); } }); 

Keep in mind that this will make n + 1 network calls, 1 for the ExampleObject s list, and then 1 for the ExampleObject in this list. If at all possible, I highly recommend that you speak with the companion API for information on searching the sides of the API. Just know that it will use some bandwidth!

+3
source

Since your intermediate call to update the record is asynchronous, I don't think you can use List<ExampleObject> , but instead use ExampleObject directly from Observable :

 mService.getExampleObject() // Spread the list .flatMap(list -> Observable.from(list)) // Update your object // Here we zip the object with the missing object, // so that when the missing object is obtained, // we update the entry and emit it. .flatMap(entry -> Observable.zip( Observable.just(entry), mDocsService.getMissingObjectByFoo(entry.getFoo()), (entry, missingObject) -> { entry.setBar(missingObject); return entry; }) ) // if you really want a map after all .toList(); 

Side note:

You can skip zip if you are fine if the function in map depends on an external variable (record). I try to avoid something, but here it’s all the same:

  .flatMap(entry -> mDocsService.getMissingObjectByFoo(entry.getFoo()) .map(missingObject -> { entry.setBar(missingObject); return entry; }) ) 
+3
source

Source: https://habr.com/ru/post/1012432/


All Articles