RxJs: list of requests from the server, consumption of values, repeated request when we are almost out of values

I get a list of elements from a REST api. The user interacts with every click, and when there is, say, a couple left unused, I would like to repeat the request to get more items. I am trying to do this using the appropriate RxJs (5) stream-oriented approach.

So something like:

var userClick$ = Observable.fromEvent(button.nativeElement, 'click');

var needToExtend$ = new BehaviorSubject(1);

var list$ = needToExtend$
            .flatMap( () => this.http.get("http://myserver/get-list") )
            .flatMap( x => x['list'] );

var itemsUsed$ = userClick$.zip(list$, (click, item) => item);
itemsUsed$.subscribe( item => use(item) );

and then to force a reload if necessary:

list$.subscribe(
    if (list$.isEmpty()) {
        needToExtend$.next(1);
    }
)

This last bit is erroneous, and manual restart does not seem very "flow oriented", even if it works as intended. Any ideas?

Rxjs - Consume API , , , API, , . . , ?

+4
2

- :

const LIST_LIMIT = 3;
userClick$ = Observable.fromEvent(button.nativeElement, 'click');
list$ = this.http.get("http://myserver/get-list").map(r => r.list);

clickCounter$ = this.userClick$.scan((acc: number, val) => acc + 1, 0);

getList$ = new BehaviorSubject([]);

this.getList$
    .switchMap(previousList => this.list$)
    .switchMap(list => this.clickCounter$, (list, clickCount) => { return {list, clickCount}; })
    .filter(({list, clickCount}) => clickCount >= list.length - LIST_LIMIT)
    .map(({list, clickCount}) => list)
    .subscribe(this.getList$);

, .

-, switchMap , , . , , , , 3 ( ). , .

: , ( ) , . :

const LIST_LIMIT = 3;
userClick$ = Observable.fromEvent(button.nativeElement, 'click');
list$ = this.http.get("http://myserver/get-list").map(r => r.list);

clickCounter$: Observable<number> = this.userClick$.scan((acc: number, val) => acc + 1, 0).startWith(0);

getList$ = new BehaviorSubject([]);

refresh$ = this.getList$
        .switchMap(list => this.clickCounter$
                               .filter(clickCount => list.length <= clickCount + LIST_LIMIT)
                               .first(), 
            (list, clickCount) => list)
        .switchMap(previousList => this.list$)
        .multicast(() => this.getList$);

this.refresh$.connect();
this.refresh$.subscribe(e => console.log(e));

, "". , , . , .

+2

, , , - . , .

console.clear();
const pageSize = 5;
const pageBuffer = 2;
const data = [...Array(17).keys()]

function getData(page) {
  const begin = pageSize * page
  const end = begin + pageSize;
	return Rx.Observable.of(data.slice(begin, end));
}

const clicks = Rx.Observable.interval(400);

clicks
  .scan(count => ++count, 0)
  .do(() => console.log('click'))
  .map(count => {
    const page = Math.floor(count / pageSize) + 1;
    const total = page * pageSize;
    return { total, page, count }
  })
  .filter(x => x.total - pageBuffer === x.count)
  .startWith({ page: 0 })
  .switchMap(x => getData(x.page))
  .takeWhile(x => x.length > 0)
  .subscribe(
    x => { console.log('next: ', x); },
    x => { console.log('error: ', x); },
    () => { console.log('completed'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.3/Rx.min.js"></script>
Hide result

:

  • Rx.Observable.interval(#): click
  • .scan(...):
  • .map(...): ( ,
  • .filter(...): , , .
  • .startWith(...): , . +1 .scan .
  • .switchMap(...): .
  • .takeWhile(...): , .

, , . , ( ), .

, , , - , . , .

+2

Source: https://habr.com/ru/post/1690279/


All Articles