Nobody seems to know how to do what you ask.
In this case, you can process your thread in the traditional asynchronous way, applying your return function to the result.
Here are some examples of how to do this.
Collecting all stream data using raw-body
One solution is to collect all data from a stream before processing it. This can be easily done with the raw-body module :
var rawBody = require('raw-body'); function processData(data) { console.log("before sleep"); sleepForMs(500); console.log("after sleep"); } function processFile(callback) { var stream = fs.createReadStream('fiber.js'); rawBody(stream, function(err, data) { if (err) return callback(err); Fiber(processData).run(data);
Using this example, you:
- wait until all the pieces are
- initiate stream data processing in
Fiber - return from
processData to the main thread - stream data will be processed at some point in the future
If you want, you can add try ... catch or any other exception handling to prevent processData from crushing your application.
Using the intelligent job queue to process all fragments in a series
But if you really want to process all pieces of data at the time of their arrival, you can use the intelligent flow control module. Here is an example of using the queue function from async module :
function processChunk(data, next) { return function() { console.log("before sleep"); sleepForMs(500); console.log("after sleep"); next(); } } function processFile(callback) { var q = async.queue(function(data, next) { Fiber(processChunk(data, next)).run(); }, 1); fs.createReadStream('fiber.js').on('data', function(data) { q.push(data); }).on('error', function(err) { callback(err); }).on('end', function() { callback();
Using this example, you:
- start listening to
stream , push input of each new fragment to the processing queue - return from
processData to the main stream of the closing moment of the stream , without waiting for the processing of data fragments - all pieces of data will be processed in strict order at some point in time
I know that this is not what you asked for, but I hope this helps you.
source share