ReactiveX: group and buffer are only the last element in each group

How to group the observable, and from each GroupedObservable to save in memory only the last emitted element? So that each group behaves the same as BehaviorSubject.

Something like that:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

Thus, in memory we will have only the last element for each user:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

And when the subscriber subscribes, these two elements were released immediately (each in its own issue). Just like we had a BehaviorSubject for everyone user.

onCompleted () should never fire, as people can chat forever.

I do not know in advance what values usermay be.

+4
2

, . , groupObservables, #groupBy, .

( , ), ReplaySubject (1).

, ,

. jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });
+3

, , scan shareReplay , . :

var fn_scan = function ( aMessages, message ) {
  // aMessages is the latest array of messages
  // this function will update aMessages to reflect the arrival of the new message
  var aUsers = aMessages.map(function ( x ) {return x.user;});
  var index = aUsers.indexOf(message.user);
  if (index > -1) {
    // remove previous message from that user...
    aMessages.splice(index, 1);
  }
  // ...and push the latest message
  aMessages.push(message);
  return aMessages;
};
var groupedLatestMessages$ = messages$
    .scan(fn_scan, [])
    .shareReplay(1);

, - , , , , , .

, , . , , , . , .

UPDATE: jsbin http://jsfiddle.net/zs7ydw6b/2

+1

Source: https://habr.com/ru/post/1618969/


All Articles