How are asynchronous streams transferred to RXJS?

I am trying to understand how a stream is transmitted over a channel in RXjs.
I know this should not be a problem, because the whole idea is with asynchronous threads - but still there is something that I want to understand.

Looking at this code:

var source = Rx.Observable .range(1, 3) .flatMapLatest(function (x) { //`switch` these days... return Rx.Observable.range(x*100, 2); }); source.subscribe(value => console.log('I got a value ', value)) 

Result:

 I got a value 100 I got a value 200 I got a value 300 I got a value 301 

I suppose (IIUC) that the diagram looks something like this: (notice that striked 101,201, which are unsubscribed)

 ----1---------2------------------3------------------------------| ░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░ -----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301------------- 

And here is the question:

Question:

Is it always guaranteed that 2 will arrive before (101)? does the same as 3 arrive earlier (201)?

I mean - if I do not consider the time line, it is therefore obvious that the following diagram will take place:

 ----1---------------2---------------3------------------------------| ░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░ -----100-------101------200---201-----300------301------------- 

Where 2 arrived with a slight delay when 101 was already released

What am I missing here? How does the pipe work here?

+5
source share
2 answers

For this particular Observable circuit with this particular version of RxJS, the order of emissions will always be the same.

As already mentioned, in RxJS 4 it uses the currentThread scheduler, as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39 .
All schedulers (except immediate from RxJS 4) internally use some type of queue , so the order is always the same.

The order of events is very similar to what you showed in the diagram (... or, at least, I think it is):

Note that this behavior is different in RxJS 4 and RxJS 5.

In RxJS 5, most observers and operators do not use any scheduler by default (the obvious exception is the Observables / operators, which should work with delays). Thus, in RxJS 5, RangeObservable will not schedule anything and will immediately start emitting values ​​in the loop.

The same example in RxJS 5 will give a different result:

 const source = Observable .range(1, 3) .switchMap(function (x) { return Observable.range(x * 100, 2); }); source.subscribe(value => console.log('I got a value ', value)); 

This will print the following:

 I got a value 100 I got a value 101 I got a value 200 I got a value 201 I got a value 300 I got a value 301 

However, this will change significantly if you add, for example, delay(0) . Common sense dictates that this should do nothing:

 const source = Observable .range(1, 3) .switchMap(function (x) { return Observable.range(x * 100, 2).delay(0); }); source.subscribe(value => console.log('I got a value ', value)); 

Now only the internal RangeObservable planned and placed over and over several times, because of which it emits only the values ​​from the most recent RangeObservable :

 I got a value 300 I got a value 301 
+1
source

I suppose you already understand the "pipe" as you call it. In any case, it’s still good to consider here how the data flows down the subscription chain: Hot and cold observables: are there any “hot” and “cold” operators?

This answer does not address data flow scheduling. Data is indeed emitted sequentially, which is part of the contract. However, the outlier time is determined by the scheduler, which is used for the observable. There are a number of reasonable default values ​​for each operator, which makes it so that most of the time we don’t even need to think about planning.

It's hard to know exactly what's going on here, but it's best to assume that range highlights all its values ​​in the Rx.Scheduler.currentThread schedules work as soon as possible on the current thread. , which schedules work as soon as possible on the current thread. .

Scheduler.Immediate ensures that the action is not planned, but rather completed immediately. Scheduler.CurrentThread ensures that actions are performed on which the initial call is made. This is different from Scheduler.Immediate, because CurrentThread queues the action that needs to be performed.

So:

  • 1 is emitted
  • flatMapLatest creates Rx.Observable.range(x*100, 2); observable and subscribes to it, which leads to an issue of 100, as well as to a schedule of issue 101.
  • Before this happens, 2 is issued, so 101 discarded
  • same with 3 , but then there are no new values, so nothing prevents 301 from getting at the end of the stream.

This can be observed in the following jsfiddle: http://jsfiddle.net/ukhtwwcz/

The question of exactly WHY he behaves in such a way in detail, I can not state.

+1
source

Source: https://habr.com/ru/post/1266745/


All Articles