Let me first give some context of the problem. The goal would be to use Rx to handle a simple paging search screen.
On the input side, the user can add various filtering criteria, as well as change the page currently being displayed (in the case of large result sets). For simplicity, this can be modeled using a stream of pairs {filter, page}.
On the output side, the result data should be displayed in the table, and the number of pages should be updated. This would mean a stream of pairs {data, count}.
The logic connecting the two is as follows: whenever the filter settings change, send a request “count” to the server to get a page count. Whenever the filter options or page index change, send a “select” request to the server to get the result data. Wait for both of them to be available before updating the screen.
Here's a plunger that mimics this in a minimal way. s1 / s2 / s3 indicate the filter parameters, p1 / p2 / p3 are the page indexes, # s1 / # s2 / # s3 are the “count” answers for the filter, and something like “s2 p3” is the “select” response "for this filter and page combination. You can see various failed approaches commented out.
http://plnkr.co/xurmUO
, - groupBy window, . groupBy , . "count" , , "select" .
, , , , , . , "" . groupByUntil , . (- ), . :
inputs
.groupByUntil(
function(input) { return input.filter; },
null,
function(inputGroup) {
var filter = inputGroup.key;
return inputs.firstOrDefault(function (input) {
return input.filter !== filter;
});
}
);
, , , , . , "count" . , : .
, groupBy, , , . , ? Observable.create ?
EDIT: , , Observable.create.
Rx.Observable.prototype.splitBy = function(keySelector) {
var source = this;
var currentGroup = null;
var lastKey = null;
return Rx.Observable.create(function(observer) {
return source.subscribe(
function(value) {
var key = keySelector(value);
if (currentGroup === null || key !== lastKey) {
if (currentGroup !== null) {
currentGroup.onCompleted();
}
lastKey = key;
currentGroup = new Rx.Subject();
currentGroup.key = key;
observer.onNext(currentGroup);
}
currentGroup.onNext(value);
},
function(error) {
if (currentGroup !== null) {
currentGroup.onError(error);
}
observer.onError(error);
},
function() {
if (currentGroup !== null) {
currentGroup.onCompleted();
}
observer.onCompleted();
});
});
};