Discard observed from the side of the manufacturer, not the side of the consumer

Cancellation by the consumer can be called using takeUntil, but this is not necessarily very dynamic. In this case, however, I want to cancel the Observable from the side of the equation maker in the same way as you could cancel the Promise in the promise chain (which is not possible with the built-in utility).

Say I have this Observable returned from a method. (This library of queues is a simple constant queue that reads / writes to a text file, we need to block read / write so that nothing gets distorted).

Queue.prototype.readUnique = function () { var ret = null; var lockAcquired = false; return this.obsEnqueue .flatMap(() => acquireLock(this)) .flatMap(() => { lockAcquired = true; return removeOneLine(this) }) .flatMap(val => { ret = val; // this is not very good return releaseLock(this); }) .map(() => { return JSON.parse(ret); }) .catch(e => { if (lockAcquired) { return releaseLock(this); } else { return genericObservable(); } }); }; 

I have two different questions -

  • If I can’t get the lock, how can I "cancel" the observable, just send back the empty observable without result (s)? Do I really have to do if / else logic in each callback to decide if the current chain is canceled, and if so, return an empty Observable? By empty, I mean Observable, that simple lights onNext / onComplete without any possibility for errors and without any values ​​for onNext. Technically, I don't think the Observable is empty, so I'm looking for what is really called if it exists.

  • If you look at this particular code sequence:

     .flatMap(() => acquireLock(this)) .flatMap(() => { lockAcquired = true; return removeOneLine(this) }) .flatMap(val => { ret = val; return releaseLock(this); }) .map(() => { return JSON.parse(ret); }) 

what I am doing is storing the ret link at the top of the method and then referring to it again one step later. I am looking for a way to pass a value released from removeOneLine () to JSON.parse (), without having to set any state outside the chain (this is simply inelegant).

+5
source share
2 answers

1) It depends on how your acquireLock method acquireLock , but I assume that it throws an error if it cannot get a lock, in which case you can create your stream using catch and set the backup stream to empty:

 return Rx.Observable.catch( removeLine$, Rx.Observable.empty() ); 

2) To save the stateful external variable, you can simply create a mapTo chain:

 let removeLine$ = acquireLock(this) .flatMap(() => this.obsEnqueue .flatMap(() => removeOneLine(this)) .flatMap(val => releaseLock(this).mapTo(val)) .map(val => JSON.parse(val)) .catch(() => releaseLock(this)) ); 
+3
source

According to your cancellation definition, this should prevent the observed value from being sent downstream. To prevent the value being observed from clicking, you can use a filter:

It could be simple:

 observable.filter(_ => lockAcquired) 

This will send a notification only adrift if lockAcquired is true.

+2
source

Source: https://habr.com/ru/post/1261797/


All Articles