How to combine two streams (without zeros) and apply conditions for pairs?

I have two data streams, is there any way to merge them and apply conditions for data between these two streams? for instance

Stream A : A, B, C, D.... Stream B : -, A, -, -.... Composed : (A,-),(B,A),(C,-),(D,-).... 

How to create a composite stream above using rxjs? I would like to apply the conditions for compiled threads to raise some notifications. You could also use the latest known non-zero data, for example, see Compound stream below.

 Stream A : A, B, C, D.... Stream B : 1, null, 2, null.... Composed : (A,1),(B,1),(C,2),(D,2).... 

I just started playing with the reactive flow idea, so please correct me if I misunderstood the idea of โ€‹โ€‹reactive flows.

+6
source share
1 answer

There are two operators that can serve your offer.

Zip :
Rx zip
Link for RxJs: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md

CombineLatest :
Rx CombineLatest
Link for RxJs: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/combinelatest.md

Images explain the differences between the two. Now you have combined the observable, which you just need to filter using where , which will filter if one of the values โ€‹โ€‹is null.

Unfortunately, none of the operators can get the behavior you described:

 Stream A : A, B, C, D, E.... Stream B : 1, null, 2, null, 3.... Composed : (A,1),(B,1),(C,2),(D,2).... 

If you use Zip and Where (after filtering the null values โ€‹โ€‹after), the result will be:

 Composed: (A,1),(C,2),(E,3) 

If you use Where (filtering zero values โ€‹โ€‹earlier) and Zip, the result will be:

 Composed: (A,1),(B,2),(C,3) 

If you use CombineLatest, it will depend on the order in which events occur in the streams, and, of course, where you put the where statement, the result may differ from what you showed, for example:

 Stream A : A, B, C, D.... Stream B : 1, null, 2, null.... Composed : (A,1),(B,1),(C,1),(C,2),(D,2).... // OR Composed : (A,1),(B,1),(B,2),(C,2),(D,2).... 

If you do not have more specific requirements, I think one of the options that I talked about is what you are looking for, do not hesitate to add information.

There are several ways to create observables, other not mentioned operators:

  • distinctUntilChanged , can be added to the final composition, using the key selection function to limit only the zip part or the last value.
  • switch used to combine one observable inside another.
+11
source

Source: https://habr.com/ru/post/981803/


All Articles