RxJS: Backpressure with a switching matrix producing N values

I am talking to some RESTful search endpoint that is related to pagination. The query is launched by the user entering in the search field, and as a result, it generates an Observable with N values ​​corresponding to the N result pages.

The code looks something like this:

function runQueries(queryObservable) {
  return queryObservable
           .debounceTime(500)
           .distinctUntilChanged()
           .switchMap(search);
}

function search(query) {
  return Observable.create(observer => searchInto(observer, query, 0));
}

function searchInto(observer, query, start) {
  runQuery(query, start).subscribe(result => {
    observer.next(result);
    if (hasMorePages(result)) {
      searchInto(observer, query, start + 1);
    } else {
      observer.complete();
    }
  });
}

Now search queries may take some time, and I don’t want to retrieve all the pages if the user changes the query.

Let's say the search returns 3 pages, and the user changes the query after loading one page. I want to see something like:

USER: types query A
CODE: loads page A1
USER: types query B
CODE: loads page B1
CODE: loads page B2
CODE: loads page B3

switchMapgets half the work done. The obtained observable has the correct sequence: A1, B1, B2, B3. Fine.

, , .. switchMap "" , . , :

USER: types query A
CODE: loads page A1   -> returned by search observable
USER: types query B
CODE: loads page A2   -> discarded by search observable
CODE: loads page B1   -> returned by search observable
CODE: loads page B2   -> returned by search observable
CODE: loads page A3   -> discarded by search observable
CODE: loads page B3   -> returned by search observable

"A" "B" ( ), .

? ?

+1
2

switchMap , Observable, . Subscription Observable.create, .

Observable.create, , , expand :

function runQueries(queryObservable) {
  return queryObservable
           .debounceTime(500)
           .distinctUntilChanged()
           .switchMap(search);
}

function search(query) {
  //Kicks off the first query
  return runQuery(query, 0)
      //Uses the results of the first query to see if more queries should be made
     .expand((result, idx) => 
       //Continues to execute more queries until `hasMorePages` is false
       hasMorePages(result) ? 
         runQuery(query, idx + 1) : 
         Observable.empty());
}
+1

flatMap , Observable, "". observer.isUnsubscribed, :

function searchInto(observer, query, start) {
  runQuery(query, start).subscribe(result => {
    observer.next(result);
    if (hasMorePages(result) && !observer.isUnsubscribed) {
      searchInto(observer, query, start + 1);
    } else {
      observer.complete();
    }
  });
}
+1

Source: https://habr.com/ru/post/1658573/


All Articles