How to emit items from a delayed list in RxJava?

I use Retrofit to get bookmarks from the REST API:

public interface BookmarkService { @GET("/bookmarks") Observable<List<Bookmark>> bookmarks(); } 

Now I would like to release every item from this list with a delay.

In Java, I did something similar to, but onCompleted never starts.

 private Observable<Bookmark> getBookmarks() { return getBookmarkService().bookmarks() .flatMap(new Func1<List<Bookmark>, Observable<Bookmark>>() { @Override public Observable<Bookmark> call(List<Bookmark> bookmarks) { Observable<Bookmark> resultObservable = Observable.never(); for (int i = 0; i < bookmarks.size(); i++) { List<Bookmark> chunk = bookmarks.subList(i, (i + 1)); resultObservable = resultObservable.mergeWith(Observable.from(chunk).delay(1000 * i, TimeUnit.MILLISECONDS)); } return resultObservable; } }) .observeOn(AndroidSchedulers.mainThread()); } 

What am I doing wrong?

Using:

 mSwipeRefreshLayout.setRefreshing(true); getBookmarks() .subscribe(new Observer<Bookmark>() { @Override public void onCompleted() { Timber.i("Completed"); mSwipeRefreshLayout.setRefreshing(false); } @Override public void onError(Throwable e) { Timber.i("Error: %s", e.toString()); mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(Bookmark bookmark) { Timber.i("Bookmark: %s", bookmark.toString()); mBookmarksAdapter.addItem(bookmark); } }); 
+6
source share
1 answer

When you use the merge operation, onCompleted will be called if all Observables are complete. but Observable.never() will never end. Use Observable.empty() instead.

According to your code, you want to fix the delayed subscription. Candlestick contains only one element

What you can do: arrange your list to emit all the elements. Buffer it to create a list of items, then use a delay.

 private Observable<Bookmark> getBookmarks() { return getBookmarkService().bookmarks() .flatMap((bookmarks) -> Observable.from(bookmarks) .buffer(1) .scan(new Pair(0, null), (ac, value) -> new Pair(acu.index + 1, value) .flatMap(pair -> Observable.just(pair.value).delay(pair.index, SECONDS)) .observeOn(AndroidSchedulers.mainThread()); } 

it can work (not verified)

+4
source

Source: https://habr.com/ru/post/981626/


All Articles