I am using the job-collection package to do the following:
- Upload a large file with tons of metadata about web pages
- Create stream from file metadata separated by regular expression with NPM
event-stream package - Check if the metadata matches in the collection match (I tried to pass this metadata to every other page to do this)
The file is too large to buffer, so streaming is required. Here is a small file with a few examples of metadata if you want to try this.
Each job from the job-collection package is already inside an asynchronous function:
var request = Npm.require('request'); var zlib = Npm.require('zlib'); var EventStream = Meteor.npmRequire('event-stream'); function (job, callback) { //This download is much too long to block request({url: job.fileURL, encoding: null}, function (error, response, body) { if (error) console.error('Error downloading File'); if (response.statusCode !== 200) console.error(downloadResponse.statusCode, 'Status not 200'); var responseEncoding = response.headers['content-type']; console.log('response encoding is %s', responseEncoding); if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') { console.log('Received binary/octet-stream'); var regexSplit = /WARC\/1\./; response.pipe(zlib.createGunzip() .pipe(EventStream.split(regexSplit)) .pipe(EventStream.map(function (webpageMetaData) { /* Need parse the metaData or pass each webpageMetaData to function * This next function could block if it had to */ searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous })); } else { console.error('Wrong encoding'); } }); } function searchWebPageMetaData(metaData) { // Parse JSON and search collection for match }
- Are there any better ways to structure this? Am I on the right track?
- Where to put
Meteor.bindEnvironment ? - Do I bind the environment for every time I go to searchWebPageMetaData() ? Do I need to use fibers explicitly here? - The thread stops at startup if I start it before
process.stdout . I have to put a thread in one of the Meteor fairings. - I know
Meteor.wrapAsync . I want to wrap the innermost searchWebPageMetaData() function in Meteor.wrapAsync ? (I think I answer that yes when I type) - Will the flow be slow to compensate for the slowness of database calls? I think not, but how can I handle this?
I spent a lot of time learning Meteor wrapAsync and bindEnvironment , but I could not put it all together and figure out where to use them.
APPENDIX 1
To clarify, follow these steps:
- Upload file;
- Create stream;
- unzip it;
- split it into separate web pages. EventStream handles this
- send it to a function - no need to return values; it can be a lock, it is just a search and a database call
I tried to do something like this, except that the main code I needed was in a function in another file. The following code has most of the answers @ electric-jesus.
processJobs('parseWatFile', { concurrency: 1, cargo: 1, pollInterval: 1000, prefetch: 1 }, function (job, callback) { if (job.data.watZipFileLink) { queue.pause(); console.log('queue should be paused now'); var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz'; function searchPageMetaData(webpageMetaData, callback) { console.log(webpageMetaData); // Would be nice to just get this function logging each webPageMetaData future.return(callback(webpageMetaData)); //I don't need this to return any value - do I have to return something? } if (!watFile) console.error('No watFile passed to downloadAndSearchWatFileForEntity '); var future = new Future(); // Doc Brown would be proud. if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.'); request({url: watFile, encoding: null}, function (error, response, body) { if (error) future.throw('Error Downloading File'); if (response.statusCode !== 200) future.throw('Expected status 200, got ' + response.statusCode + '.'); var responseEncoding = response.headers['content-type']; if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') { var regexSplit = /WARC\/1\./; response.pipe(zlib.createGunzip() .pipe(EventStream.split(regexSplit)) .pipe(EventStream.map(function (webpageMetaData) { searchPageMetaData(webpageMetaData, callback); }) )); } else { future.throw('Wrong encoding'); } }); return future.wait(); } else { console.log('No watZipFileLink for this job'); job.log('ERROR: NO watZipFileLink from commonCrawlJob collection'); } queue.resume(); job.done; callback(); }