Consider using the zip operator to pin two infinite observers, one of which emits elements twice as often as the others. <w> The current implementation has no losses, i.e. If I store this observable data for an hour, and then switch between their radiation speeds, the first Observable will eventually catch up with the other.
This will cause a burst of memory at some point when the buffer grows more and more.
The same thing happens if the first observable emits objects within a few hours and the second emits one element at the end.
How can I achieve lost behavior for this operator? I just want to emit anytime I get emissions from both streams, and I don't care how many emissions from the faster stream I miss.
Explanations:
- The main problem I'm trying to solve is a memory explosion due to loss without loss of operator
zip
. - I want to emit anytime I receive emissions from streams , even if both streams generate the same value every time
Example:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
Normal zip
will produce the following result:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run codeHide resultThe result I would like to create:
[1, 10]
[3, 20]
[5, 30]
:
Lossy zip
zip
1
. , , , (, ). , : stream1
1
, lossy zip "" stream1
, stream2
. stream2
10
, stream1
2
. ( zip
) : "" 3
, "" 4
, [3,20]
. : "" 5
, "" 6
7
, [5,30]
. : "" 40
, "" 50
, 60
, 70
stream1
.
2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
zip
.
.
:
, , zip
, stream 1
, , stream 2
stream 1
. .