Node.js Posting the same readable stream to multiple (write) targets

I need to run two commands at once, which should read data from one stream. After feeding the stream to another buffer, it is freed, so I can not read data from this stream again, so this does not work:

var spawn = require('child_process').spawn; var fs = require('fs'); var request = require('request'); var inputStream = request('http://placehold.it/640x360'); var identify = spawn('identify',['-']); inputStream.pipe(identify.stdin); var chunks = []; identify.stdout.on('data',function(chunk) { chunks.push(chunk); }); identify.stdout.on('end',function() { var size = getSize(Buffer.concat(chunks)); //width var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']); inputStream.pipe(convert.stdin); convert.stdout.pipe(fs.createWriteStream('half.png')); }); function getSize(buffer){ return parseInt(buffer.toString().split(' ')[2].split('x')[0]); } 

Request complains about this

 Error: You cannot pipe after data has been emitted from the response. 

and changing inputStream to fs.createWriteStream gives the same problem of course. I do not want to write to the file, but somehow reuse the stream created by request (or any other).

Is there a way to reuse a readable stream after the pipeline completes? What would be the best way to accomplish something like the above example?

+48
javascript stream pipe
Oct 23 '13 at 22:56
source share
5 answers

You cannot reuse transferred data that has already been sent. And you cannot pass the stream after your "end". Thus, you cannot process the same thread twice, and you need two threads. You must duplicate the stream by connecting it to two streams. You can create a simple stream with a PassThrough stream, it just passes input to output.

 spawn = require('child_process').spawn; pass = require('stream').PassThrough; a = spawn('echo', ['hi user']); b = new pass; c = new pass; a.stdout.pipe(b); a.stdout.pipe(c); count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); }); 

Exit

 8 hi user 
+60
Oct 24 '13 at 9:13
source share

The first answer only works if the threads take about the same amount of time to process the data. If someone takes much longer, the faster the new data is requested, therefore, the data that is still used by the slower one is overwritten (I had this problem after trying to solve it using a duplicated stream).

The following template worked very well for me. It uses a library based on Stream2, Streamz, and Promises streams to synchronize async streams through a callback. Using a familiar example from the first answer:

 spawn = require('child_process').spawn; pass = require('stream').PassThrough; streamz = require('streamz').PassThrough; var Promise = require('bluebird'); a = spawn('echo', ['hi user']); b = new pass; c = new pass; a.stdout.pipe(streamz(combineStreamOperations)); function combineStreamOperations(data, next){ Promise.join(b, c, function(b, c){ //perform n operations on the same data next(); //request more } count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); }); 
+11
Nov 23 '15 at 19:38
source share

For a common problem, the following code works fine

 var PassThrough = require('stream').PassThrough a=PassThrough() b1=PassThrough() b2=PassThrough() a.pipe(b1) a.pipe(b2) b1.on('data', function(data) { console.log('b1:', data.toString()) }) b2.on('data', function(data) { console.log('b2:', data.toString()) }) a.write('text') 
+1
Nov 06 '15 at 9:55
source share

What about pipelines in two or more flows at different times?

For example:

 var PassThrough = require('stream').PassThrough; var mybiraryStream = stream.start(); //never ending audio stream var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'}) var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'}) var mypass = PassThrough mybinaryStream.pipe(mypass) mypass.pipe(file1) setTimeout(function(){ mypass.pipe(file2); },2000) 

The code above does not cause errors, but file2 is empty

+1
Nov 29 '16 at 8:14
source share

I have another solution for recording in two streams at the same time, of course, the time for recording will be two times adding, but I use it to respond to the download request, where I want to save a copy of the downloaded file on my server (in fact, I use the backup S3, so I cache the most used files locally to avoid multiple file transfers)

 /** * A utility class made to write to a file while answering a file download request */ class TwoOutputStreams { constructor(streamOne, streamTwo) { this.streamOne = streamOne this.streamTwo = streamTwo } setHeader(header, value) { if (this.streamOne.setHeader) this.streamOne.setHeader(header, value) if (this.streamTwo.setHeader) this.streamTwo.setHeader(header, value) } write(chunk) { this.streamOne.write(chunk) this.streamTwo.write(chunk) } end() { this.streamOne.end() this.streamTwo.end() } } 

Then you can use this as a regular OutputStream

 const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream) 

and pass it to your method as if it were an answer or an OutputStream file

0
Nov 08 '17 at 17:50
source share



All Articles