Rx - Split Observable based on content (group before change)

Let me first give some context of the problem. The goal would be to use Rx to handle a simple paging search screen.

On the input side, the user can add various filtering criteria, as well as change the page currently being displayed (in the case of large result sets). For simplicity, this can be modeled using a stream of pairs {filter, page}.

On the output side, the result data should be displayed in the table, and the number of pages should be updated. This would mean a stream of pairs {data, count}.

The logic connecting the two is as follows: whenever the filter settings change, send a request “count” to the server to get a page count. Whenever the filter options or page index change, send a “select” request to the server to get the result data. Wait for both of them to be available before updating the screen.

Here's a plunger that mimics this in a minimal way. s1 / s2 / s3 indicate the filter parameters, p1 / p2 / p3 are the page indexes, # s1 / # s2 / # s3 are the “count” answers for the filter, and something like “s2 p3” is the “select” response "for this filter and page combination. You can see various failed approaches commented out.

http://plnkr.co/xurmUO

, - groupBy window, . groupBy , . "count" , , "select" .

, , , , , . , "" . groupByUntil , . (- ), . :

inputs
  .groupByUntil(
    function(input) { return input.filter; },
    null,
    function(inputGroup) { 
      var filter = inputGroup.key;
      return inputs.firstOrDefault(function (input) {
        return input.filter !== filter;
      });
    }
  );

, , , , . , "count" . , : .

, groupBy, , , . , ? Observable.create ?

EDIT: , , Observable.create.

Rx.Observable.prototype.splitBy = function(keySelector) {
  var source = this;
  var currentGroup = null;
  var lastKey = null;
  return Rx.Observable.create(function(observer) {
    return source.subscribe(
      function(value) {
        var key = keySelector(value);
        // Create a new group if the key changed (or this is the first value)
        if (currentGroup === null || key !== lastKey) {
          // Close previous group
          if (currentGroup !== null) {
            currentGroup.onCompleted();
          }
          lastKey = key;
          // Emit current group
          currentGroup = new Rx.Subject();
          currentGroup.key = key;
          observer.onNext(currentGroup);
        }
        currentGroup.onNext(value);
      },
      // Forward errors/completion to current group and the main stream
      function(error) {
        if (currentGroup !== null) {
          currentGroup.onError(error);
        }
        observer.onError(error);
      },
      function() {
        if (currentGroup !== null) {
          currentGroup.onCompleted();
        }
        observer.onCompleted();
      });
  });
};
+4
1

groupBy , . flatMapLatest:

var count = inputs.flatMapLatest(function (input) {
    // some code that returns an
    // Rx observable that will eventually
    // return the count
    return rxQueryForCount(input.filter);
});
count.subscribe(...);

- , .

: , - :

var count = inputs
    .distinctUntilChanged(null, function (a, b) {
        // however you decide if your filters have changed...
        return a.filter === b.filter;
    })
    .flatMapLatest(function (input) {
        // some code that returns an
        // Rx observable that will eventually
        // return the count
        return rxQueryForCount(input.filter)
            .startWith(undefined);
    });
var data = inputs
    .flatMapLatest(function (input) {
        return rxQueryForData(input.filter, input.index)
            .startWith(undefined);
    });
var results = Rx.Observable
    .combineLatest(count, data, function (c, d) {
        if (c === undefined || d === undefined) {
            return undefined;
        }

        return { count: c, data: d };
    });

undefined , - , . where undefined.

+4

Source: https://habr.com/ru/post/1568917/


All Articles