How to say when the flow with pipes ends with a drain

I have a stream that consists of a chain of pipes. I use event-stream to create pipe building blocks. The code gets the file from S3, unpacks it, parses it and transfers the data to some asynchronous function. I try to resolve the promise when it finishes processing this file. How can I be sure that the whole chain has finished draining? My current solution looks like this. it looks bad, and I still think there is a chance that it resolve() will be called, while gzReader has chunks of data.

thank

const inputStream = this.s3client.getObject(params).createReadStream()
inputStream.on('end',() => {
console.log("Finished handling file " + fileKey)
let stopInterval = setInterval(() => {
    if (counter == 0) {
        resolve(this.eventsSent)
        clearInterval(stopInterval)
      }
    }, 300)
  })
const gzReader = zlib.createGunzip();
inputStream
  .pipe(gzReader)
  .pipe(es.split())
  .pipe(es.parse())
  .pipe(es.mapSync(data => {
    counter++
    this.eventsSent.add(data.data)
    asyncFunc(this.destinationStream, data.data)
      .then(() => {
        counter--
      })
      .catch((e) => {
        counter--
        console.error('Failed sending event '  + data.data + e)
      })
  }))
+4
source share
1 answer

Source: https://habr.com/ru/post/1694182/


All Articles