How to use fiber with threads

I am trying to use fibers with threads:

var Fiber = require('fibers'); var Future = require('fibers/future'); var fs = require('fs'); function sleepForMs(ms) { var fiber = Fiber.current; setTimeout(function() { fiber.run(); }, ms); Fiber.yield(); } function catchError(f, onError) { return function () { var args = arguments; var run = function () { try { var ret = f.apply(null, args); } catch (e) { onError(e); } return ret; }; if (Fiber.current) { return run(); } else { return Fiber(run).run(); } } } function processFile(callback) { var count, finished, onData, onException, onIgnoredEntry; count = 0; finished = false; onException = function (error) { if (finished) { console.error("Exception thrown after already finished:", error.stack || error); } if (finished) { return; } finished = true; return callback(error); }; onData = function(data) { console.log("onData"); if (finished) { return; } console.log("before sleep"); sleepForMs(500); console.log("after sleep"); throw new Error("test"); }; return fs.createReadStream('test.js').on('data', catchError(onData, onException)).on('end', function() { console.log("end"); if (finished) { return; } finished = true; return callback(null, count); }).on('error', function(error) { console.log("error", error); if (finished) { return; } finished = true; return callback(error); }); }; Fiber(function () { console.log("Calling processFile"); Future.wrap(processFile)().wait(); console.log("processFile returned"); }).run(); console.log("back in main"); 

But that does not work. The data callback ends until the fiber inside the callback ends. Thus, the code above outputs:

 Calling processFile back in main onData before sleep end processFile returned after sleep Exception thrown after already finished: Error: test 

If in fact it should be something more:

 Calling processFile back in main onData before sleep after sleep end processFile returned Error: test 
+4
source share
3 answers

Here is an implementation using wait.for (wrapper around Fibers) https://github.com/luciotato/waitfor

In this implementation, a fiber is launched for each data block, so the "n" tasks are started in parallel. ProcessFile does not "return" until all fibers have completed.

This is a demonstration of how you can do this with Fibers and wait.for, but, of course, you must encapsulate the module level classes and all the functions in the class before using this in the production process.

 var wait = require('wait.for'); var fs = require('fs'); var tasksLaunched=0; var finalCallback; var callbackDone=false; var dataArr=[] function sleepForMs(ms,sleepCallback) { setTimeout(function() { return sleepCallback(); }, ms); } function resultReady(err,data){ if (err){ callbackDone = true; return finalCallback(err); } dataArr.push(data); if (dataArr.length>=tasksLaunched && !callbackDone) { callbackDone = true; return finalCallback(null,dataArr); } } function processChunk(data,callback) { var ms=Math.floor(Math.random()*1000); console.log('waiting',ms); wait.for(sleepForMs,ms); console.log(data.length,"chars"); return callback(null,data.length); } function processFile(filename,callback) { var count, onData, onException, onIgnoredEntry; count = 0; finalCallback = callback; onException = function (error) { if (!callbackDone){ callbackDone = true; return callback(error); } }; onData = function(data) { console.log("onData"); tasksLaunched++; wait.launchFiber(processChunk,data,resultReady); }; fs.createReadStream(filename) .on('data', onData) .on('end', function() { console.log("end"); }) .on('error', function(error) { console.log("error", error); if (!callbackDone) { callbackDone = true; return callback(error); } }); }; function mainFiber() { console.log("Calling processFile"); var data = wait.for(processFile,'/bin/bash'); console.log(data.length,"results"); console.log("processFile returned"); }; //MAIN wait.launchFiber(mainFiber); console.log("back in main"); 
0
source

Reduce sleep time and set some priorities or timers to other blocks. so after a certain limit, time blocks are displayed according to priority. Here's how you can get the result you want.

0
source

Nobody seems to know how to do what you ask.

In this case, you can process your thread in the traditional asynchronous way, applying your return function to the result.

Here are some examples of how to do this.


Collecting all stream data using raw-body

One solution is to collect all data from a stream before processing it. This can be easily done with the raw-body module :

 var rawBody = require('raw-body'); function processData(data) { console.log("before sleep"); sleepForMs(500); console.log("after sleep"); } function processFile(callback) { var stream = fs.createReadStream('fiber.js'); rawBody(stream, function(err, data) { if (err) return callback(err); Fiber(processData).run(data); // process your data callback(); }); } 

Using this example, you:

  • wait until all the pieces are
  • initiate stream data processing in Fiber
  • return from processData to the main thread
  • stream data will be processed at some point in the future

If you want, you can add try ... catch or any other exception handling to prevent processData from crushing your application.


Using the intelligent job queue to process all fragments in a series

But if you really want to process all pieces of data at the time of their arrival, you can use the intelligent flow control module. Here is an example of using the queue function from async module :

 function processChunk(data, next) { return function() { console.log("before sleep"); sleepForMs(500); console.log("after sleep"); next(); } } function processFile(callback) { var q = async.queue(function(data, next) { Fiber(processChunk(data, next)).run(); }, 1); fs.createReadStream('fiber.js').on('data', function(data) { q.push(data); }).on('error', function(err) { callback(err); }).on('end', function() { callback(); // not waiting to queue to drain }) } 

Using this example, you:

  • start listening to stream , push input of each new fragment to the processing queue
  • return from processData to the main stream of the closing moment of the stream , without waiting for the processing of data fragments
  • all pieces of data will be processed in strict order at some point in time

I know that this is not what you asked for, but I hope this helps you.

0
source

Source: https://habr.com/ru/post/977813/


All Articles