How can I get the only last value from an infinite stream of RxJs, which is not the initial value?

Concept

This is an angular2 supercharged project.

When you consume an observable stream from the redux store, I tried to filter first and then take / takeLast / last the most recent value. After that, I want to resolve the promise when the thread ends, but it does not work when using the takeLast operator.

So the question is: What operator setting can I use to get the last value from the stream ?

Customization

I have simplified the configuration of Angular 2 in this sense of using RxJs.

  • The original observable is managed by the redux library and is not complete. Service
  • provides some logic for getting the last value from a component stream
  • uses a style of promise of value.

Here is a working example: https://fiddle.jshell.net/markus_falk/an41z6g9/

Reduction store mock:

var latestTime$ = new Rx.Subject();
setInterval(function(){
     latestTime$.onNext(Date.now()); 
}, 2000);

Service Injection Layout:

var timeStore = null;
var getLatestTime = function() {

  return new Promise((resolve, reject) => {

     latestTime$

     /* 
        filter out 'null' for when the button is clicked
        before the store updates the first time
      */
     .filter(function(x) {
        console.log('filter: ', x);
        return x === typeof('number');
     })

     // try to end to stream by taking the last from the stream ?!?!?!?
     .takeLast(1)

     // handle promise
     .subscribe(

       function (x) {
         console.log('Next: ' + x);
         // store latest stream value
         timeStore = x;
       },
       function (err) {
         console.log('Error: ' + err);
         reject(err)
       },
       function () {
         console.log('Completed');
         // pass on latest value of endless when stream completes 
         resolve(timeStore);
       }

    );

  });

};

And consuming component layout:

document.querySelector("#foo").addEventListener("click", function(event) {

  var time = getLatestTime();

  time.then((latestTime) => {
    console.log('latestTime: ', latestTime);
  });

  time.catch((err) => {
    console.log('oh oh: ', err);
  });

}, false);
+4
source share
1 answer

This should mimic your situation.

See the demo version: https://jsfiddle.net/usualcarrot/zh07hfrc/1/

var subject = new Rx.Subject();

subject.skip(1).last().subscribe(function(val) {
  console.log('next:', val);
}, function(val) {
  console.log('error:', val);
}, function() {
  console.log('completed');
});

subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.onCompleted();

Sent to the console:

next: 5
completed

console.log('completed'); resolve(...). , , Subject (?) . asObservable(), , Subject. . asObservable().

+1

Source: https://habr.com/ru/post/1656180/


All Articles