Node.js: splitting a readable stream into multiple consecutive writeable streams

Given a stream Readable(which may be process.stdineither a stream of files), is it possible / practical to use pipe()for a user stream Writablethat will fill a child Writableto a certain size; then close this child thread; open a new stream Writableand continue?

(The context is to load most of the data from the pipeline into the CDN, dividing it into blocks of a reasonable size as they appear, without having to write the data to disk first.)

I tried to create Writableone that handles opening and closing a child stream in a function _write, but the problem arises when the incoming fragment is too large to fit into an existing child stream: it has to write a piece of the piece to the old stream; create a new thread; and then wait for the event openin the new thread to complete the call _write.

Another thought that I had was to create an additional stream, Duplexeither Transformto buffer the channel and ensure that the piece included in Writableis definitely equal to or less than the amount that the existing accept child stream can have to give Writabletime to change the child stream.

Alternatively, is this all superfluous, and is it much easier to complete the initial task there?

+4
source share
2 answers

I came across a question when I was looking for an answer to a related problem. How to parse a file and split it into separate files depending on any category value in the line.

I did my best to modify my code to make it more relevant to your problem. However, it adapts quickly. Not tested. Treat it like a pseudo code.

var fs = require('fs'),
  through = require('through');

 var destCount = 0, dest, size = 0, MAX_SIZE = 1000;

readableStream
  .on('data', function(data) {
    var out = data.toString() + "\n";
    size += out.length;
    if(size > MAX_SIZE) {
      dest.emit("end");
      dest = null;
      size = 0;
    }
    if(!dest) {
      // option 1. manipulate data before saving them.
      dest = through();
      dest.pipe(fs.createWriteStream("log" + destCount))
      // option 2. write directly to file
      // dest = fs.createWriteStream("log" + destCount);
    }
    dest.emit("data", out);
  })
  .on('end', function() {
    dest.emit('end');
  });
+2
source

I would introduce Transformbetween stream Readableand Writable. And in my own _transformI would fulfill all the logic that I needed.

Maybe I will only have Readable and Transform. The method _transformwould create all the stream necessary for writing.

, - , .
_read _write , Transform.

, , - . .?

+1

Source: https://habr.com/ru/post/1526789/


All Articles