How to convert a stream to a generator without leaking permission with a promise

I have a stream and I need to convert it to a generator, so the loader can use the generator generator.

This means a turn:

stream.on('data', chunk => ...);

at

generator = streamGenerator(stream);
chunk = await generator.next()
...

better:

chunk = yield streamGenerator;

All in all, my best attempt is to leak the solution out of the promise, and I would like to avoid this:

function streamToIterable(chunkSize, stream) {
    let collector = [];
    let value = [];
    let done = false;
    let _resolve;
    let promise = new Promise(resolve => _resolve = resolve);
    stream.on('data', chunk => {
        collector = collector.concat(chunk);
        if (value.length >= chunkSize) {
            value = collector.splice(0, chunkSize);
            _resolve(value);
            stream.pause();
        }
    });
    stream.on('end', () => {
        _resolve(collection);

        // With done set to true, the next iteration well ignore 'value' and end the loop
        done = true;
    });
    stream.resume();

    return {
        next: () => ({
            value: promise.then(() => {
                stream.resume();
                promise = new Promise(resolve => _resolve = resolve);
            }),
            done,
        }),
    };
}

function* streamToGenerator(stream) {
    const iterator = streamToIterable(stream);
    let next = iterator.next();
    while (!next.done) {
        yield next.value;
    }
};

Use in a generator to load pieces:

for (const chunkData of generator()) {
    let result = yield uploadPost(url, formData, onChunkProgress(chunkIndex));

This is in redux satellite, so "next ()" is not called in the generator until a return promise is resolved.

+4
source share
2 answers

resolve , , promises. once, :

function streamToIterator(stream) {
    let done = false;
    const end = new Promise(resolve => {
        stream.once('end', resolve);
    }).then(e => {
        done = true;
    });

    return {
        [Symbol.iterator]() { return this; }
        next() {
            const promise = new Promise(resolve => {
                stream.once('data', value => {
                    resolve(value);
                    stream.pause();
                });
                stream.resume();
            });

            return {
                value: Promise.race([promise, end]),
                done,
            };
        }),
    };
}

, end data , , next , , , , .

, node.js, , API , data .

, es-next. , , , , , , node readablestream iterable.

+3

EDIT: , , , , " ". .

, .

; promises , .

, , , promises , , , , promises .

function streamToAsyncIterator(chunkSize, stream) {
    let done = false;
    let endPromise = new Promise(resolve => {
        //flush out the last data.
        stream.on('end', () => {
            resolve({ value: collector, done: false });
        });
    });

    //two-track queue for expecting and sending data with promises
    let dataPromises = [];
    let dataResolves = [];
    stream.on('data', value => {
        const dataResolve = dataResolves.shift();
        if (dataResolve) {
            dataResolve({ value, done: false });
        } else {
            dataPromises.push(Promise.resolve({ value, done: false }));
        }
        stream.pause();
    });

    return {
        [Symbol.asyncIterator]() {
            return this;
        },
        //TODO handle return() to close the stream
        next() {
            if (done) return Promise.resolve({ done });

            stream.resume();

            let dataPromise = dataPromises.shift();
            if (!dataPromise) {
                dataPromise = new Promise(resolve => dataResolves.push(resolve));
            }

            return Promise.race([dataPromise, endPromise])
                // done must be set in the resolution of the race, or done could complete the generator before the last iteration of data.
                .then(next => {
                     if (next.done) {
                         done = true;
                         next.done = false;
                     }
                     return next;
                });
        },
    };
}

async function* streamToAsyncGenerator(chunkSize, stream) {
    const iterator = streamToAsyncIterator(chunkSize, stream);
    let next = await iterator.next();
    while (!next.done) {
        yield next.value;
        // Delete is needed to release resouces
        // Without delete, you'll get a memory error at 2GB.
        delete next.value;
        next = await iterator.next();
    }
};

EDIT: , , , , GC . , .

0

Source: https://habr.com/ru/post/1692995/


All Articles