RxJs: loss of form of zip operator

Consider using the zip operator to pin two infinite observers, one of which emits elements twice as often as the others. <w> The current implementation has no losses, i.e. If I store this observable data for an hour, and then switch between their radiation speeds, the first Observable will eventually catch up with the other.
This will cause a burst of memory at some point when the buffer grows more and more.
The same thing happens if the first observable emits objects within a few hours and the second emits one element at the end.

How can I achieve lost behavior for this operator? I just want to emit anytime I get emissions from both streams, and I don't care how many emissions from the faster stream I miss.

Explanations:

  • The main problem I'm trying to solve is a memory explosion due to loss without loss of operator zip.
  • I want to emit anytime I receive emissions from streams , even if both streams generate the same value every time

Example:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

Normal zipwill produce the following result:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run codeHide result

The result I would like to create:

[1, 10]
[3, 20]
[5, 30]

:
Lossy zip zip 1. , , , (, ). , : stream1 1, lossy zip "" stream1, stream2. stream2 10, stream1 2. ( zip) : "" 3, "" 4, [3,20]. : "" 5, "" 6 7, [5,30]. : "" 40, "" 50, 60, 70 stream1.

2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

zip .
.

:
, , zip , stream 1 , , stream 2 stream 1. .

+6
5

:

Observable.zip(s1.take(1), s2.take(1)).repeat()

RxJs 5.5 :

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Hide result

:

  • repeat ( ) , , zip .
  • zip , . combineLatest , - take(1)
  • take(1)

, :

Observable.combineLatest(s1, s2).take(1).repeat()

RxJs 5.5 :

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Hide result

+7

, Observable.

const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

Live demo: http://jsbin.com/vawewew/11/edit?js,console

:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

, source1 source2 , .

Edit:

source1.takeUntil(source2.skipUntil(source1)). source1 , source2 . source1, source2 :).

forkJoin() , , .

, take(1) .repeat() .

+1

[0, 2] [1, 5] [2, 8] [3, 12]...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

, , . , .
CodePen

# 3,

.

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen ( CodePen , )

+1

1, , zip ReplaySubjects 1?

0

, ( ).

, , , :

,

, ,

Stream1: 1 2    3 4    5 6 7                 
Stream2:     10     20    30 40 50 60 70

Stream1: 1 2      3 4     5 6
Stream2:    10 20    30 40   50 60

s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6);  s2.next(50); s2.next(60);

, .

[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]

[1, 10]
[3, 30]
[5, 50]

(?)

, , , . , , .

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])

,

let index = 0
const output = 
  s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
  .scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged()       //fresh ones only
  .map(keyed => keyed[0])       // de-key
  .pairwise()                   // pair
  .map(x=>[x,index++])          // add a sequence no
  .filter(x => x[1] % 2 === 0)  // take even sequence
  .map(x=>x[0])                 // deindex

CodePen ( CodePen , )

0

Source: https://habr.com/ru/post/1687014/


All Articles