Why is my published delayed Observable factory called multiple times?

I have a task flow that will queue until a Subject signal is called using the .zip() operator. The subject of the signal subscribes to the current task. I also try to monitor progress in this task.

What I was trying to do was use .publish() to multicast the Observable task, so that I could let the Sign Subscribe to .last() task signal to trigger detection, as well as subscribe to the overall progress of the outliers tasks.

It seems to work. However, when I look at what is being printed, it looks like my Observable factory receives a call on every call to .subscribe() , although I used .publish() . I don’t understand how multicast works? I figured that .publish() ed Observable would be created using the factory, and this singular instance would be split, but it's cold until .connect() was called.

My task runner

Note the .defer() , which calls the tasker .

 "use strict"; const { Observable, Subject, BehaviorSubject } = Rx; // How often to increase project in a task const INTERVAL_TIME = 200; // Keep track of how many tasks we have let TASK_ID = 0; // Easy way to print out observers function easyObserver(prefix = "Observer") { return { next: data => console.log(`[${prefix}][next]: ${data}`), error: err => console.error(`[${prefix}][error] ${err}`), complete: () => console.log(`[${prefix}][complete] Complete`) }; } // Simulate async task function tasker(name = "", id = TASK_ID++) { console.log(`tasker called for ${id}`); let progress = 0; const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`); console.log(`Task[${name||id}][started]`); let interval = setInterval(() => { progress = (progress + (Math.random() * 50)); if (progress >= 100) { progress = 100; clearInterval(interval); progress$.next(`Task[${name||id}][${progress}%]`); progress$.complete(); return; } progress$.next(`Task[${name||id}][${progress}%]`); }, INTERVAL_TIME); return progress$.asObservable(); } // Create a signal subject that will tell the queue when to next const dequeueSignal = new BehaviorSubject(); // Make some tasks const tasks$ = Observable .range(0, 3); // Queue tasks until signal tells us to emit the next task const queuedTasks$ = Observable .zip(tasks$, dequeueSignal, (i, s) => i); // Create task observables const mcQueuedTasks$ = queuedTasks$ .map(task => Observable.defer(() => tasker(`MyTask${task}`))) .publish(); // Print out the task progress const progressSubscription = mcQueuedTasks$ .switchMap(task => task) .subscribe(easyObserver("queuedTasks$")); // Cause the signal subject to trigger the next task const taskCompleteSubscription = mcQueuedTasks$ .switchMap(task => task.last()) .delay(500) .subscribe(dequeueSignal); // Kick everything off mcQueuedTasks$.connect(); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script> 

My way out

Notice how you see several calls for the caller with the tasker called for N and the factory body is called. However, before any progress tasker() , tasker() is called again with the next TASK_ID . The result seems correct, because Task[MyTask0] does not skip any indexes, only TASK_ID s.

 tasker called for 0 Task[MyTask0][started] [queuedTasks$][next]: Task[MyTask0][0%] tasker called for 1 Task[MyTask0][started] [queuedTasks$][next]: Task[MyTask0][20.688413934455674%] [queuedTasks$][next]: Task[MyTask0][32.928520335195564%] [queuedTasks$][next]: Task[MyTask0][42.58361384849108%] [queuedTasks$][next]: Task[MyTask0][73.1297043008671%] [queuedTasks$][next]: Task[MyTask0][100%] tasker called for 2 Task[MyTask1][started] [queuedTasks$][next]: Task[MyTask1][0%] tasker called for 3 Task[MyTask1][started] [queuedTasks$][next]: Task[MyTask1][37.16513927245511%] [queuedTasks$][next]: Task[MyTask1][47.27771448102375%] [queuedTasks$][next]: Task[MyTask1][60.45983311604027%] [queuedTasks$][next]: Task[MyTask1][100%] tasker called for 4 Task[MyTask2][started] [queuedTasks$][next]: Task[MyTask2][0%] tasker called for 5 Task[MyTask2][started] [queuedTasks$][next]: Task[MyTask2][32.421275902708544%] [queuedTasks$][next]: Task[MyTask2][41.30332084025583%] [queuedTasks$][next]: Task[MyTask2][77.44113197852694%] [queuedTasks$][next]: Task[MyTask2][100%] [queuedTasks$][complete] Complete 
+5
source share
1 answer

It seems that Observable.defer not required in this function:

 // Create task observables const mcQueuedTasks$ = queuedTasks$ .map(task => Observable.defer(() => tasker(`MyTask${task}`))) .publish(); 

The Defer operator waits until an observer joins it, and then it generates an Observable, usually with an observable factory function. He does this anew for each subscriber, therefore, although each subscriber may think that he is subscribing to the same Observed, in fact, each subscriber receives his own individual sequence.

You have already created Observable here:

 // Make some tasks const tasks$ = Observable .range(0, 3); 

In the map loop, you create an additional Observable for each task ...

Get rid of Observable.defer so that the function looks like this:

 // Create task observables const mcQueuedTasks$ = queuedTasks$ .map(task => tasker(`MyTask${task}`)) .publish(); 

Excerpt:

 "use strict"; const { Observable, Subject, BehaviorSubject } = Rx; // How often to increase project in a task const INTERVAL_TIME = 200; // Keep track of how many tasks we have let TASK_ID = 0; // Easy way to print out observers function easyObserver(prefix = "Observer") { return { next: data => console.log(`[${prefix}][next]: ${data}`), error: err => console.error(`[${prefix}][error] ${err}`), complete: () => console.log(`[${prefix}][complete] Complete`) }; } // Simulate async task function tasker(name = "", id = TASK_ID++) { console.log(`tasker called for ${id}`); let progress = 0; const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`); console.log(`Task[${name||id}][started]`); let interval = setInterval(() => { progress = (progress + (Math.random() * 50)); if (progress >= 100) { progress = 100; clearInterval(interval); progress$.next(`Task[${name||id}][${progress}%]`); progress$.complete(); return; } progress$.next(`Task[${name||id}][${progress}%]`); }, INTERVAL_TIME); return progress$.asObservable(); } // Create a signal subject that will tell the queue when to next const dequeueSignal = new BehaviorSubject(); // Make some tasks const tasks$ = Observable .range(0, 3); // Queue tasks until signal tells us to emit the next task const queuedTasks$ = Observable .zip(tasks$, dequeueSignal, (i, s) => i); // Create task observables const mcQueuedTasks$ = queuedTasks$ .map(task => tasker(`MyTask${task}`)) .publish(); // Print out the task progress const progressSubscription = mcQueuedTasks$ .switchMap(task => task) .subscribe(easyObserver("queuedTasks$")); // Cause the signal subject to trigger the next task const taskCompleteSubscription = mcQueuedTasks$ .switchMap(task => task.last()) .delay(500) .subscribe(dequeueSignal); // Kick everything off mcQueuedTasks$.connect(); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script> 

Hope this helps.

+5
source

Source: https://habr.com/ru/post/1274604/


All Articles