Meteor: How can I transfer and parse a large file for the async Node function?

I am using the job-collection package to do the following:

  • Upload a large file with tons of metadata about web pages
  • Create stream from file metadata separated by regular expression with NPM event-stream package
  • Check if the metadata matches in the collection match (I tried to pass this metadata to every other page to do this)

The file is too large to buffer, so streaming is required. Here is a small file with a few examples of metadata if you want to try this.

Each job from the job-collection package is already inside an asynchronous function:

 var request = Npm.require('request'); var zlib = Npm.require('zlib'); var EventStream = Meteor.npmRequire('event-stream'); function (job, callback) { //This download is much too long to block request({url: job.fileURL, encoding: null}, function (error, response, body) { if (error) console.error('Error downloading File'); if (response.statusCode !== 200) console.error(downloadResponse.statusCode, 'Status not 200'); var responseEncoding = response.headers['content-type']; console.log('response encoding is %s', responseEncoding); if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') { console.log('Received binary/octet-stream'); var regexSplit = /WARC\/1\./; response.pipe(zlib.createGunzip() .pipe(EventStream.split(regexSplit)) .pipe(EventStream.map(function (webpageMetaData) { /* Need parse the metaData or pass each webpageMetaData to function * This next function could block if it had to */ searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous })); } else { console.error('Wrong encoding'); } }); } function searchWebPageMetaData(metaData) { // Parse JSON and search collection for match } 
  • Are there any better ways to structure this? Am I on the right track?
  • Where to put Meteor.bindEnvironment ? - Do I bind the environment for every time I go to searchWebPageMetaData() ? Do I need to use fibers explicitly here?
  • The thread stops at startup if I start it before process.stdout . I have to put a thread in one of the Meteor fairings.
  • I know Meteor.wrapAsync . I want to wrap the innermost searchWebPageMetaData() function in Meteor.wrapAsync ? (I think I answer that yes when I type)
  • Will the flow be slow to compensate for the slowness of database calls? I think not, but how can I handle this?

I spent a lot of time learning Meteor wrapAsync and bindEnvironment , but I could not put it all together and figure out where to use them.

APPENDIX 1

To clarify, follow these steps:

  • Upload file;
  • Create stream;
  • unzip it;
  • split it into separate web pages. EventStream handles this
  • send it to a function - no need to return values; it can be a lock, it is just a search and a database call

I tried to do something like this, except that the main code I needed was in a function in another file. The following code has most of the answers @ electric-jesus.

  processJobs('parseWatFile', { concurrency: 1, cargo: 1, pollInterval: 1000, prefetch: 1 }, function (job, callback) { if (job.data.watZipFileLink) { queue.pause(); console.log('queue should be paused now'); var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz'; function searchPageMetaData(webpageMetaData, callback) { console.log(webpageMetaData); // Would be nice to just get this function logging each webPageMetaData future.return(callback(webpageMetaData)); //I don't need this to return any value - do I have to return something? } if (!watFile) console.error('No watFile passed to downloadAndSearchWatFileForEntity '); var future = new Future(); // Doc Brown would be proud. if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.'); request({url: watFile, encoding: null}, function (error, response, body) { if (error) future.throw('Error Downloading File'); if (response.statusCode !== 200) future.throw('Expected status 200, got ' + response.statusCode + '.'); var responseEncoding = response.headers['content-type']; if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') { var regexSplit = /WARC\/1\./; response.pipe(zlib.createGunzip() .pipe(EventStream.split(regexSplit)) .pipe(EventStream.map(function (webpageMetaData) { searchPageMetaData(webpageMetaData, callback); }) )); } else { future.throw('Wrong encoding'); } }); return future.wait(); } else { console.log('No watZipFileLink for this job'); job.log('ERROR: NO watZipFileLink from commonCrawlJob collection'); } queue.resume(); job.done; callback(); } 
+6
source share
2 answers

Interestingly, it looks good. I have never worked with job-collection , but it seems to be just a Mongo-driven task queue .. so I assume that it works like a regular queue. I always found for people with a callback, I definitely use the Future template. eg:

 var request = Npm.require('request'); var zlib = Npm.require('zlib'); var EventStream = Meteor.npmRequire('event-stream'); var Future = Npm.require('fibers/future'); var searchWebPageMetaData = function (metaData) { // Parse JSON and search collection for match // make it return something var result = /droids/ig.test(metaData); return result; } var processJob = function (job, callback) { var future = new Future(); // Doc Brown would be proud. if(typeof callback !== 'function') future.throw("Oops, you forgot that callbacks are supposed to be functions.. not undefined or whatever."); //This download is much too long to block request({url: job.fileURL, encoding: null}, function (error, response, body) { if (error) future.throw("Error Downloading File"); if (response.statusCode !== 200) future.throw("Expected status 200, got " + downloadResponse.statusCode + "."); var responseEncoding = response.headers['content-type']; if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') { var regexSplit = /WARC\/1\./; response.pipe(zlib.createGunzip() .pipe(EventStream.split(regexSplit)) .pipe(EventStream.map(function (webpageMetaData) { /* Need parse the metaData or pass each webpageMetaData to function * This next function could block if it had to */ // pass each metadatum to this function to update a collection - this function can be synchronous future.return(callback(webpageMetaData)); // this way, processJob returns whatever we find in the completed webpage, via callback. })); } else { future.throw('Wrong encoding'); } }); return future.wait(); } 

Usage example:

so whenever you assign variables here:

 var currentJob = processJob(myjob, searchWebPageMetaData); 

and even with the appointment / appointment of the synchronous type, you get your asynchronous things that were done and delivered exactly to you on time.

To answer your questions,

  • Where to place Meteor.bindEnvironment? - do I bind the environment for every time I go to searchWebPageMetaData ()? Do I need to use fibers explicitly here?

    In fact, I believe that explicit use of fibers/future already take care of this.

  • The thread stops at startup if I run it on process.stdout. I have to put a stream in one of the meteor fairings

    how do you understand that? I vaguely remember how process.stdout blocks, which could be the reason. again, wrapping the result in Future should take care of that.

  • I know Meteor.wrapAsync. I want to wrap the innermost searchWebPageMetaData () function in Meteor.wrapAsync? (I think I answer that yes when I type)

    Take a look at the Meteor.wrapAsync helper code . This is basically the Future resolution applied, of course, you can do it, and then you can also explicitly use fibers/future yourself without any problems.

  • Will the flow be slow to compensate for the slowness of database calls? I think not, but how can I handle this?

    Not sure what you mean here ... but since we are trying to use asynchronous fibers, I think this is not the case. I have not seen any slowness using fibers. Probably only if several launches are started at once (and simultaneously executed), then you will have a performance problem in terms of memory usage. Keep the parallel queue low, as the fibers can be quite powerful when starting the material at the same time. You have only one core to handle all of this, which is a sad fact as node cannot be multicore :(

+3
source

This is quite difficult if you want to handle all errors correctly. Therefore, you need to ask yourself what to do if: you generate code, or an error event handler is called. You want these errors to propagate correctly, that is, to be selected as an exception in the stream code for fiber calls. I implemented something like this for one of our collection tasks to extract tar files .

First you need some helper functions:

 bindWithFuture = (futures, mainFuture, fun, self) -> wrapped = (args...) -> future = new Future() if mainFuture future.resolve (error, value) -> # To resolve mainFuture early when an exception occurs mainFuture.throw error if error and not mainFuture.isResolved() # We ignore the value args.push future.resolver() try futures.list.push future fun.apply (self or @), args catch error future.throw error # This waiting does not really do much because we are # probably in a new fiber created by Meteor.bindEnvironment, # but we can still try to wait Future.wait future Meteor.bindEnvironment wrapped, null, self wait = (futures) -> while futures.list.length Future.wait futures.list # Some elements could be added in meantime to futures, # so let remove resolved ones and retry futures.list = _.reject futures.list, (f) -> if f.isResolved() # We get to throw an exception if there was an exception. # This should not really be needed because exception should # be already thrown through mainFuture and we should not even # get here, but let check for every case. f.get() true # And to remove resolved 

And then you can run something like:

 mainFuture = new Future() # To be able to override list with a new value in wait we wrap it in an object futures = list: [] bindWithOnException = (f) => Meteor.bindEnvironment f, (error) => mainFuture.throw error unless mainFuture.isResolved() onWebpageMetaData = (metaData, callback) => return callback null if mainFuture.isResolved() # Do whatever you want here. # Call callback(null) when you finish. # Call callback(error) if there is an error. # If you want to call into a Meteor code inside some other callback for async code you use, # use bindWithOnException to wrap a function and stay inside a Meteor environment and fiber. MeteorCollection.insert metaData: metaData callback null requestFuture = new Future() request url: job.fileURL encoding: null , (error, response, body) -> return requestFuture.throw error if error return requestFuture.throw new Error "Expected status 200, got #{ response.statusCode }." unless response.statusCode is 200 requestFuture.return response response = requestFuture.wait() responseEncoding = response.headers['content-type'] throw new Error "Wrong encoding" unless responseEncoding in ['application/octet-stream', 'binary/octet-stream'] regexSplit = /WARC\/1\./ response.pipe( zlib.createGunzip() ).pipe( EventStream.split regexSplit ).pipe( EventStream.map bindWithFuture futures, mainFuture, onWebpageMetaData ).on('end', => # It could already be resolved by an exception from bindWithFuture or bindWithOnException mainFuture.return() unless mainFuture.isResolved() ).on('error', (error) => # It could already be resolved by an exception from bindWithFuture or bindWithOnException mainFuture.throw error unless mainFuture.isResolved() ) mainFuture.wait() wait futures 
+3
source

Source: https://habr.com/ru/post/977807/


All Articles