I have a stream that consists of a chain of pipes. I use event-stream to create pipe building blocks. The code gets the file from S3, unpacks it, parses it and transfers the data to some asynchronous function. I try to resolve the promise when it finishes processing this file. How can I be sure that the whole chain has finished draining? My current solution looks like this. it looks bad, and I still think there is a chance that it resolve()
will be called, while gzReader has chunks of data.
thank
const inputStream = this.s3client.getObject(params).createReadStream()
inputStream.on('end',() => {
console.log("Finished handling file " + fileKey)
let stopInterval = setInterval(() => {
if (counter == 0) {
resolve(this.eventsSent)
clearInterval(stopInterval)
}
}, 300)
})
const gzReader = zlib.createGunzip();
inputStream
.pipe(gzReader)
.pipe(es.split())
.pipe(es.parse())
.pipe(es.mapSync(data => {
counter++
this.eventsSent.add(data.data)
asyncFunc(this.destinationStream, data.data)
.then(() => {
counter--
})
.catch((e) => {
counter--
console.error('Failed sending event ' + data.data + e)
})
}))
source
share