RxJS: Combining Historical Data with the Stream of Updates

Scenario:

I load the initial data array through a simple ajax call and passing that data through the Observable, which I will call historical . In parallel, I connect to websocket and periodically receive data, which we will call updates , and I want to add this data to the historical one .

Specifically, for example, that the ajax call sends back an array [0,1,2], and the socket emits (eventually) 3, 4, 5then I want to accumulate these values as follows:

[0,1,2]        // historical
[0,1,2,3]      // historical + updates1
[0,1,2,3,4]    // historical + updates1 + updates2
[0,1,2,3,4,5]  // etc

(Please note that we need a boundary case of concurrency: it is possible that the historical yields [0,1,2,3]and the first two of the updates 3 and 4, in this case, all I want is still [0,1,2,3,4]- DO NOT [0,1,2,3,3,4] .)

The ultimate goal is to get one Observable thread , which is a combination of Historical Observables and updates as described.

What I have tried so far:

Accumulating only websocket data is simple enough. I am creating updates , which is an observable sequence issued by websocket. Every time a value is observed, I can assemble it into an array using scan():

updates.scan((acc, update) => acc.concat([update]), [])

This will give something like

[3]
[3,4]
[3,4,5]

, . , , , history. , withLatestFrom():

const stream = historical
  .withLatestFrom(
    updates.scan((acc, update) => acc.concat([update]), []),
    (history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
  )

[0,1,2,3,4,5], , history. , .

, . , - :

[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]

scan , , scan () , .

- , , ?

+4
1

, skipUntil(), , . withLatestFrom() updates Observable . skipUntil() , , updates.

let updates = Observable
  .timer(0, 1000)
  .scan((acc, update) => {
    acc.push(update);
    return acc;
  }, []);

let historical = Observable.defer(() => { 
    console.log('Sending AJAX request ...');
    return Observable.of(['h1', 'h2', 'h3']); 
  })
  .delay(3000)
  .share();


const stream = updates.skipUntil(historical)
  .withLatestFrom(historical, (buffer, history) => {
    return history.concat(buffer);
  })
  .map(val => val) // remove duplicates;


stream.subscribe(val => console.log(val));

:

Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]

-: https://jsbin.com/kumolez/11/edit?js,console

, , concat(), , buffer .

, ( ), distinct() .

, , RxJS 5.

+3

Source: https://habr.com/ru/post/1669742/


All Articles