It looks like you solved your problem, but to your original question
I would like to force the observer change to complete earlier, because my test code knows how many lines should be in the file. How to do it?
In general, using Subject discouraged when you have better alternatives, as they tend to be a crutch for people to use the programming styles they are familiar with. Instead of trying to use Subject , I would suggest that you think about what each event in the Observable life cycles will mean.
Wrap Event Emitters
There is already a wrapper for the EventEmitter#on/off template in the form Observable.fromEvent . It processes cleanup and only supports subscriptions when there are listeners. Thus, ObserveTail can be reorganized into
function ObserveTail(filename) { return Rx.Observable.create(function(observer) { var lineSep = /[\r]{0,1}\n/; tail = new Tail(filename, lineSep, {}, true); var line = Rx.Observable.fromEvent(tail, "line"); var close = Rx.Observable.fromEvent(tail, "close"); var error = Rx.Observable.fromEvent(tail, "error") .flatMap(function(err) { return Rx.Observable.throw(err); });
Who has several advantages over vanilla use of Subjects , one, you will now see an error downstream, and two, this will handle clearing your events when you are done with them.
Avoid * Sync Methods
This can then be collapsed into your file existence check without using readSync
Then you can simplify the filter / map / map sequence down using flatMap .
var result = source.flatMap(function(x) { try { return Rx.Observable.just(JSON.parse(x)); } catch (e) { return Rx.Observable.empty(); } },
Do not signal, unsubscribe
How to stop a βstopβ signal when flows move in only one direction. We rarely really want Observer to communicate directly with Observable, so the best template is not to βsignalβ a stop, but simply unsubscribe from Observable and leave it to Observable to determine what it should do from there.
Essentially, your Observer really shouldn't care about your Observable anymore than saying "I'm here."
To do this, you need to declare the condition you want to fall into when you stop.
In this case, since you are just stopping after a given number in the test case, you can use take to cancel the subscription. Thus, the final subscription block will look like this:
result //After lines is reached this will complete. .take(lines) .subscribe ( function(name) { tid.equal(name, "AssetMgr", "verified name field is AssetMgr"); }, function(err) { console.error(err) tid.fail("err leaked through to subscriber"); }, function() { tid.end(); console.log("Completed"); } );
Change 1
As stated in the comments, in the case of this particular api, there is no real βclosedβ event, since Tail is essentially an infinite operation. In this sense, it is no different from a mouse event handler; we stop sending events when people stop listening. So your block will probably look like this:
function ObserveTail(filename) { return Rx.Observable.create(function(observer) { var lineSep = /[\r]{0,1}\n/; tail = new Tail(filename, lineSep, {}, true); var line = Rx.Observable.fromEvent(tail, "line"); var error = Rx.Observable.fromEvent(tail, "error") .flatMap(function(err) { return Rx.Observable.throw(err); });
Adding finally and share statements creates an object that will attach to the tail when a new subscriber arrives and will remain attached if there is at least one subscriber still listening. Once all subscribers are complete, we can safely unwatch tail.