Bridge to Sequence Channel

This code is based on the Coroutines Sample Guide: Fan-out

val inputProducer = produce<String>(CommonPool) {
    (0..inputArray.size).forEach {
        send(inputArray[it])
    }
}

val resultChannel = Channel<Result>(10)

repeat(threadCount) {
    launch(CommonPool) {
        inputProducer.consumeEach {
            resultChannel.send(getResultFromData(it))
        }
    }
}

What is the right way to create Sequence<Result>one that will provide results?

+4
source share
1 answer

You can get the channel .iterator()from ReceiveChannel, and then wrap this channel iterator in Sequence<T>, implementing it normal Iterator<T>, which blocks waiting for the result for each request:

fun <T> ReceiveChannel<T>.asSequence(context: CoroutineContext) =
    Sequence {
        val iterator = iterator()
        object : AbstractIterator<T>() {
            override fun computeNext() = runBlocking(context) {
                if (!iterator.hasNext())
                    done() else
                    setNext(iterator.next())
            }
        }
    }

val resultSequence = resultChannel.asSequence(CommonPool)
+3
source

Source: https://habr.com/ru/post/1682470/


All Articles