Deploying Observables in a Permanent Queue Library

A small permanent queue library is being created that will read / write lines to a text file. Here is the add method, for example:

Queue.prototype.add = function(line, cb){ getLock(this, err => { if(err){ this.emit('error', err); releaseLock(err, cb); } else{ fs.appendFile(this.filepath, line, err => { err && this.emit('error', err); releaseLock(err, cb); }); } }); }; 

what I find rather inconvenient supports emitters and callbacks (or emitters and promises).

In other words, for each method (add, peek, remove) in the queue, I need to return / call a result specific to each call. Using an event emitter means that the caller can act on a result that was not specific to the call just made. Therefore, callbacks or promises seem imperative here - you cannot use event emitters only.

I wonder if the observed problems can solve the parallel callback problem with event emitters or promises with event emitters?

I am looking to find a way to implement this queue / asynchronous queue with only one asynchronous callback mechanism. Maybe observable here is not the answer, but nonetheless I am looking for a good design.

+6
source share
1 answer

I'm not quite sure why you need event emitters here ... If you use observable data, each subscriber will receive results / errors on his own call.

I would rewrite your method as such:

 function appendFileObs(filePath, line){ return Rx.Observable.create((obs) => { fs.appendFile(filePath, line, (err, result) => { if(err) obs.onError(err); else { obs.onNext(result); obs.onCompleted(); } }); }); }); // Similar for getLock and releaseLock Queue.prototype.add = function(line){ return getLockObs(this) .flatMap(() => appendFileObs(this.filePath, line)) .flatMap(result => releaseLockObs(undefined).map(() => result)) .catch((err) => { return releaseLockObs(err); }); }; 

In this decision, I am not proud that the stream has side effects inside, it is probably improved, but you get the idea.

Thus, when someone calls .add (line) .subscribe (), he will get the result and errors that occurred when he was called.

If you need to broadcast the errors that occurred, you can use BehaviourSubject, which is an observer and observable at the same time (useful material!)

+2
source

Source: https://habr.com/ru/post/1013207/


All Articles