Forcing Observer Termination rxjs

I have an rxjs observer (really an item) that forever stores the file, like the -f tail. This is great for monitoring log files, for example.

This "eternal" behavior is great for my application, but terrible for testing. My application is currently running, but my tests freeze forever.

I would like to force the observer change to complete earlier, because my test code knows how many lines should be in the file. How to do it?

I tried calling onCompleted in the Subject handler, which I returned, but at that moment it is mainly used as an observer, and you cannot force it to close, the error is:

Object # has no 'onCompleted' method

Here is the source code:

function ObserveTail(filename) { source = new Rx.Subject(); if (fs.existsSync(filename) == false) { console.error("file doesn't exist: " + filename); } var lineSep = /[\r]{0,1}\n/; tail = new Tail(filename, lineSep, {}, true); tail.on("line", function(line) { source.onNext(line); }); tail.on('close', function(data) { console.log("tail closed"); source.onCompleted(); }); tail.on('error', function(error) { console.error(error); }); this.source = source; } 

And here is a test code that cannot figure out how to make it finish forever (a tape style test). Pay attention to the line "ILLEGAL":

 test('tailing a file works correctly', function(tid) { var lines = 8; var i = 0; var filename = 'tape/tail.json'; var handle = new ObserveTail(filename); touch(filename); handle.source .filter(function (x) { try { JSON.parse(x); return true; } catch (error) { tid.pass("correctly caught illegal JSON"); return false; } }) .map(function(x) { return JSON.parse(x) }) .map(function(j) { return j.name }) .timeout(10000, "observer timed out") .subscribe ( function(name) { tid.equal(name, "AssetMgr", "verified name field is AssetMgr"); i++; if (i >= lines) { handle.onCompleted(); // XXX ILLEGAL } }, function(err) { console.error(err) tid.fail("err leaked through to subscriber"); }, function() { tid.end(); console.log("Completed"); } ); }) 
+5
source share
1 answer

It looks like you solved your problem, but to your original question

I would like to force the observer change to complete earlier, because my test code knows how many lines should be in the file. How to do it?

In general, using Subject discouraged when you have better alternatives, as they tend to be a crutch for people to use the programming styles they are familiar with. Instead of trying to use Subject , I would suggest that you think about what each event in the Observable life cycles will mean.

Wrap Event Emitters

There is already a wrapper for the EventEmitter#on/off template in the form Observable.fromEvent . It processes cleanup and only supports subscriptions when there are listeners. Thus, ObserveTail can be reorganized into

 function ObserveTail(filename) { return Rx.Observable.create(function(observer) { var lineSep = /[\r]{0,1}\n/; tail = new Tail(filename, lineSep, {}, true); var line = Rx.Observable.fromEvent(tail, "line"); var close = Rx.Observable.fromEvent(tail, "close"); var error = Rx.Observable.fromEvent(tail, "error") .flatMap(function(err) { return Rx.Observable.throw(err); }); //Only take events until close occurs and wrap in the error for good measure //The latter two are terminal events in this case. return line.takeUntil(close).merge(error).subscribe(observer); }); } 

Who has several advantages over vanilla use of Subjects , one, you will now see an error downstream, and two, this will handle clearing your events when you are done with them.

Avoid * Sync Methods

This can then be collapsed into your file existence check without using readSync

 //If it doesn't exist then we are done here //You could also throw from the filter if you want an error tracked var source = Rx.Observable.fromNodeCallback(fs.exists)(filename) .filter(function(exists) { return exists; }) .flatMap(ObserveTail(filename)); 

Then you can simplify the filter / map / map sequence down using flatMap .

 var result = source.flatMap(function(x) { try { return Rx.Observable.just(JSON.parse(x)); } catch (e) { return Rx.Observable.empty(); } }, //This allows you to map the result of the parsed value function(x, json) { return json.name; }) .timeout(10000, "observer timed out"); 

Do not signal, unsubscribe

How to stop a β€œstop” signal when flows move in only one direction. We rarely really want Observer to communicate directly with Observable, so the best template is not to β€œsignal” a stop, but simply unsubscribe from Observable and leave it to Observable to determine what it should do from there.

Essentially, your Observer really shouldn't care about your Observable anymore than saying "I'm here."

To do this, you need to declare the condition you want to fall into when you stop.

In this case, since you are just stopping after a given number in the test case, you can use take to cancel the subscription. Thus, the final subscription block will look like this:

 result //After lines is reached this will complete. .take(lines) .subscribe ( function(name) { tid.equal(name, "AssetMgr", "verified name field is AssetMgr"); }, function(err) { console.error(err) tid.fail("err leaked through to subscriber"); }, function() { tid.end(); console.log("Completed"); } ); 

Change 1

As stated in the comments, in the case of this particular api, there is no real β€œclosed” event, since Tail is essentially an infinite operation. In this sense, it is no different from a mouse event handler; we stop sending events when people stop listening. So your block will probably look like this:

 function ObserveTail(filename) { return Rx.Observable.create(function(observer) { var lineSep = /[\r]{0,1}\n/; tail = new Tail(filename, lineSep, {}, true); var line = Rx.Observable.fromEvent(tail, "line"); var error = Rx.Observable.fromEvent(tail, "error") .flatMap(function(err) { return Rx.Observable.throw(err); }); //Only take events until close occurs and wrap in the error for good measure //The latter two are terminal events in this case. return line .finally(function() { tail.unwatch(); }) .merge(error).subscribe(observer); }).share(); } 

Adding finally and share statements creates an object that will attach to the tail when a new subscriber arrives and will remain attached if there is at least one subscriber still listening. Once all subscribers are complete, we can safely unwatch tail.

+5
source

Source: https://habr.com/ru/post/1241679/


All Articles