Scenario:
I load the initial data array through a simple ajax call and passing that data through the Observable, which I will call historical . In parallel, I connect to websocket and periodically receive data, which we will call updates , and I want to add this data to the historical one .
Specifically, for example, that the ajax call sends back an array [0,1,2], and the socket emits (eventually) 3, 4, 5then I want to accumulate these values as follows:
[0,1,2]
[0,1,2,3]
[0,1,2,3,4]
[0,1,2,3,4,5]
(Please note that we need a boundary case of concurrency: it is possible that the historical yields [0,1,2,3]and the first two of the updates 3 and 4, in this case, all I want is still [0,1,2,3,4]- DO NOT [0,1,2,3,3,4] .)
The ultimate goal is to get one Observable thread , which is a combination of Historical Observables and updates as described.
What I have tried so far:
Accumulating only websocket data is simple enough. I am creating updates , which is an observable sequence issued by websocket. Every time a value is observed, I can assemble it into an array using scan():
updates.scan((acc, update) => acc.concat([update]), [])
This will give something like
[3]
[3,4]
[3,4,5]
, . , , , history. , withLatestFrom():
const stream = historical
.withLatestFrom(
updates.scan((acc, update) => acc.concat([update]), []),
(history, buffer) => history.concat(buffer)
)
[0,1,2,3,4,5], , history. , .
, . , - :
[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]
scan , , scan () , .
- , , ?